001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.stream.Collectors;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.util.FutureUtils;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
053 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
054 */
055@InterfaceAudience.Private
056public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
057
058  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
059  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
060
061  /**
062   * If it is an integer and >= 1, it would be the size;
063   * if 0.0 < size <= 1.0, size would be available processors * size.
064   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
065   * while latter will use only 1 thread for chore to scan dir.
066   */
067  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
068  static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
069
070  private final DirScanPool pool;
071
072  protected final FileSystem fs;
073  private final Path oldFileDir;
074  private final Configuration conf;
075  protected final Map<String, Object> params;
076  private final AtomicBoolean enabled = new AtomicBoolean(true);
077  protected List<T> cleanersChain;
078
079  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
080    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
081    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
082  }
083
084  /**
085   * @param name name of the chore being run
086   * @param sleepPeriod the period of time to sleep between each run
087   * @param s the stopper
088   * @param conf configuration to use
089   * @param fs handle to the FS
090   * @param oldFileDir the path to the archived files
091   * @param confKey configuration key for the classes to instantiate
092   * @param pool the thread pool used to scan directories
093   * @param params members could be used in cleaner
094   */
095  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
096    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
097    super(name, s, sleepPeriod);
098
099    Preconditions.checkNotNull(pool, "Chore's pool can not be null");
100    this.pool = pool;
101    this.fs = fs;
102    this.oldFileDir = oldFileDir;
103    this.conf = conf;
104    this.params = params;
105    initCleanerChain(confKey);
106  }
107
108  /**
109   * Calculate size for cleaner pool.
110   * @param poolSize size from configuration
111   * @return size of pool after calculation
112   */
113  static int calculatePoolSize(String poolSize) {
114    if (poolSize.matches("[1-9][0-9]*")) {
115      // If poolSize is an integer, return it directly,
116      // but upmost to the number of available processors.
117      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
118      if (size == AVAIL_PROCESSORS) {
119        LOG.warn("Use full core processors to scan dir, size={}", size);
120      }
121      return size;
122    } else if (poolSize.matches("0.[0-9]+|1.0")) {
123      // if poolSize is a double, return poolSize * availableProcessors;
124      // Ensure that we always return at least one.
125      int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
126      if (computedThreads < 1) {
127        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
128        return 1;
129      }
130      return computedThreads;
131    } else {
132      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
133          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
134      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
135    }
136  }
137
138  /**
139   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
140   * will go through the cleaner delegates, but otherwise the file is just deleted.
141   * @param file full {@link Path} of the file to be checked
142   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
143   */
144  protected abstract boolean validate(Path file);
145
146  /**
147   * Instantiate and initialize all the file cleaners set in the configuration
148   * @param confKey key to get the file cleaner classes from the configuration
149   */
150  private void initCleanerChain(String confKey) {
151    this.cleanersChain = new LinkedList<>();
152    String[] logCleaners = conf.getStrings(confKey);
153    if (logCleaners != null) {
154      for (String className : logCleaners) {
155        T logCleaner = newFileCleaner(className, conf);
156        if (logCleaner != null) {
157          LOG.info("Initialize cleaner={}", className);
158          this.cleanersChain.add(logCleaner);
159        }
160      }
161    }
162  }
163
164  /**
165   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
166   * LogCleanerDelegate.
167   * @param className fully qualified class name of the LogCleanerDelegate
168   * @param conf used configuration
169   * @return the new instance
170   */
171  private T newFileCleaner(String className, Configuration conf) {
172    try {
173      Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
174        FileCleanerDelegate.class);
175      @SuppressWarnings("unchecked")
176      T cleaner = (T) c.getDeclaredConstructor().newInstance();
177      cleaner.setConf(conf);
178      cleaner.init(this.params);
179      return cleaner;
180    } catch (Exception e) {
181      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
182      // skipping if can't instantiate
183      return null;
184    }
185  }
186
187  @Override
188  protected void chore() {
189    if (getEnabled()) {
190      try {
191        pool.latchCountUp();
192        if (runCleaner()) {
193          LOG.trace("Cleaned all WALs under {}", oldFileDir);
194        } else {
195          LOG.trace("WALs outstanding under {}", oldFileDir);
196        }
197      } finally {
198        pool.latchCountDown();
199      }
200      // After each cleaner chore, checks if received reconfigure notification while cleaning.
201      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
202      // This cleaner is waiting for other cleaners finishing their jobs.
203      // To avoid missing next chore, only wait 0.8 * period, then shutdown.
204      pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
205    } else {
206      LOG.trace("Cleaner chore disabled! Not cleaning.");
207    }
208  }
209
210  private void preRunCleaner() {
211    cleanersChain.forEach(FileCleanerDelegate::preClean);
212  }
213
214  public boolean runCleaner() {
215    preRunCleaner();
216    try {
217      CompletableFuture<Boolean> future = new CompletableFuture<>();
218      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
219      return future.get();
220    } catch (Exception e) {
221      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
222      return false;
223    }
224  }
225
226  /**
227   * Sort the given list in (descending) order of the space each element takes
228   * @param dirs the list to sort, element in it should be directory (not file)
229   */
230  private void sortByConsumedSpace(List<FileStatus> dirs) {
231    if (dirs == null || dirs.size() < 2) {
232      // no need to sort for empty or single directory
233      return;
234    }
235    dirs.sort(new Comparator<FileStatus>() {
236      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
237
238      @Override
239      public int compare(FileStatus f1, FileStatus f2) {
240        long f1ConsumedSpace = getSpace(f1);
241        long f2ConsumedSpace = getSpace(f2);
242        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
243      }
244
245      private long getSpace(FileStatus f) {
246        Long cached = directorySpaces.get(f);
247        if (cached != null) {
248          return cached;
249        }
250        try {
251          long space =
252              f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
253          directorySpaces.put(f, space);
254          return space;
255        } catch (IOException e) {
256          LOG.trace("Failed to get space consumed by path={}", f, e);
257          return -1;
258        }
259      }
260    });
261  }
262
263  /**
264   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
265   * necessary.
266   * @param files List of FileStatus for the files to check (and possibly delete)
267   * @return true iff successfully deleted all files
268   */
269  private boolean checkAndDeleteFiles(List<FileStatus> files) {
270    if (files == null) {
271      return true;
272    }
273
274    // first check to see if the path is valid
275    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
276    List<FileStatus> invalidFiles = Lists.newArrayList();
277    for (FileStatus file : files) {
278      if (validate(file.getPath())) {
279        validFiles.add(file);
280      } else {
281        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
282        invalidFiles.add(file);
283      }
284    }
285
286    Iterable<FileStatus> deletableValidFiles = validFiles;
287    // check each of the cleaners for the valid files
288    for (T cleaner : cleanersChain) {
289      if (cleaner.isStopped() || this.getStopper().isStopped()) {
290        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
291            + this.oldFileDir);
292        return false;
293      }
294
295      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
296
297      // trace which cleaner is holding on to each file
298      if (LOG.isTraceEnabled()) {
299        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
300        for (FileStatus file : deletableValidFiles) {
301          if (!filteredFileSet.contains(file)) {
302            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
303          }
304        }
305      }
306
307      deletableValidFiles = filteredFiles;
308    }
309
310    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
311    return deleteFiles(filesToDelete) == files.size();
312  }
313
314  /**
315   * Check if a empty directory with no subdirs or subfiles can be deleted
316   * @param dir Path of the directory
317   * @return True if the directory can be deleted, otherwise false
318   */
319  private boolean isEmptyDirDeletable(Path dir) {
320    for (T cleaner : cleanersChain) {
321      if (cleaner.isStopped() || this.getStopper().isStopped()) {
322        LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}",
323          this.getName(), dir);
324        return false;
325      }
326      if (!cleaner.isEmptyDirDeletable(dir)) {
327        // If one of the cleaner need the empty directory, skip delete it
328        return false;
329      }
330    }
331    return true;
332  }
333
334  /**
335   * Delete the given files
336   * @param filesToDelete files to delete
337   * @return number of deleted files
338   */
339  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
340    int deletedFileCount = 0;
341    for (FileStatus file : filesToDelete) {
342      Path filePath = file.getPath();
343      LOG.trace("Removing {} from archive", filePath);
344      try {
345        boolean success = this.fs.delete(filePath, false);
346        if (success) {
347          deletedFileCount++;
348        } else {
349          LOG.warn("Attempted to delete:" + filePath
350              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
351        }
352      } catch (IOException e) {
353        e = e instanceof RemoteException ?
354                  ((RemoteException)e).unwrapRemoteException() : e;
355        LOG.warn("Error while deleting: " + filePath, e);
356      }
357    }
358    return deletedFileCount;
359  }
360
361  @Override
362  public synchronized void cleanup() {
363    for (T lc : this.cleanersChain) {
364      try {
365        lc.stop("Exiting");
366      } catch (Throwable t) {
367        LOG.warn("Stopping", t);
368      }
369    }
370  }
371
372  @VisibleForTesting
373  int getChorePoolSize() {
374    return pool.getSize();
375  }
376
377  /**
378   * @param enabled
379   */
380  public boolean setEnabled(final boolean enabled) {
381    return this.enabled.getAndSet(enabled);
382  }
383
384  public boolean getEnabled() { return this.enabled.get();
385  }
386
387  private interface Action<T> {
388    T act() throws Exception;
389  }
390
391  /**
392   * Attempts to clean up a directory(its subdirectories, and files) in a
393   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
394   * calling result.get().
395   */
396  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
397    try {
398      // Step.1: List all files under the given directory.
399      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
400      List<FileStatus> subDirs =
401          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
402      List<FileStatus> files =
403          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
404
405      // Step.2: Try to delete all the deletable files.
406      boolean allFilesDeleted =
407          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
408
409      // Step.3: Start to traverse and delete the sub-directories.
410      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
411      if (!subDirs.isEmpty()) {
412        sortByConsumedSpace(subDirs);
413        // Submit the request of sub-directory deletion.
414        subDirs.forEach(subDir -> {
415          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
416          pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
417          futures.add(subFuture);
418        });
419      }
420
421      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
422      // current directory asynchronously.
423      FutureUtils.addListener(
424        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
425        (voidObj, e) -> {
426          if (e != null) {
427            result.completeExceptionally(e);
428            return;
429          }
430          try {
431            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
432            boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
433            if (deleted && !root) {
434              // If and only if files and sub-dirs under current dir are deleted successfully, and
435              // the empty directory can be deleted, and it is not the root dir then task will
436              // try to delete it.
437              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
438            }
439            result.complete(deleted);
440          } catch (Exception ie) {
441            // Must handle the inner exception here, otherwise the result may get stuck if one
442            // sub-directory get some failure.
443            result.completeExceptionally(ie);
444          }
445        });
446    } catch (Exception e) {
447      LOG.debug("Failed to traverse and delete the path: {}", dir, e);
448      result.completeExceptionally(e);
449    }
450  }
451
452  /**
453   * Perform a delete on a specified type.
454   * @param deletion a delete
455   * @param type possible values are 'files', 'subdirs', 'dirs'
456   * @return true if it deleted successfully, false otherwise
457   */
458  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
459    boolean deleted;
460    try {
461      LOG.trace("Start deleting {} under {}", type, dir);
462      deleted = deletion.act();
463    } catch (PathIsNotEmptyDirectoryException exception) {
464      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
465      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
466      // message below.
467      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
468      deleted = false;
469    } catch (IOException ioe) {
470      LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
471          + "happening, use following exception when asking on mailing list.",
472        type, dir, ioe);
473      deleted = false;
474    } catch (Exception e) {
475      LOG.info("unexpected exception: ", e);
476      deleted = false;
477    }
478    LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
479    return deleted;
480  }
481}