001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.util.FutureUtils;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048
049/**
050 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
051 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
052 */
053@InterfaceAudience.Private
054public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
055
056  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
057  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
058
059  /**
060   * If it is an integer and >= 1, it would be the size;
061   * if 0.0 < size <= 1.0, size would be available processors * size.
062   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
063   * while latter will use only 1 thread for chore to scan dir.
064   */
065  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
066  static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
067
068  private final DirScanPool pool;
069
070  protected final FileSystem fs;
071  private final Path oldFileDir;
072  private final Configuration conf;
073  protected final Map<String, Object> params;
074  private final AtomicBoolean enabled = new AtomicBoolean(true);
075  protected List<T> cleanersChain;
076
077  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
078    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
079    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
080  }
081
082  /**
083   * @param name name of the chore being run
084   * @param sleepPeriod the period of time to sleep between each run
085   * @param s the stopper
086   * @param conf configuration to use
087   * @param fs handle to the FS
088   * @param oldFileDir the path to the archived files
089   * @param confKey configuration key for the classes to instantiate
090   * @param pool the thread pool used to scan directories
091   * @param params members could be used in cleaner
092   */
093  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
094    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
095    super(name, s, sleepPeriod);
096
097    Preconditions.checkNotNull(pool, "Chore's pool can not be null");
098    this.pool = pool;
099    this.fs = fs;
100    this.oldFileDir = oldFileDir;
101    this.conf = conf;
102    this.params = params;
103    initCleanerChain(confKey);
104  }
105
106  /**
107   * Calculate size for cleaner pool.
108   * @param poolSize size from configuration
109   * @return size of pool after calculation
110   */
111  static int calculatePoolSize(String poolSize) {
112    if (poolSize.matches("[1-9][0-9]*")) {
113      // If poolSize is an integer, return it directly,
114      // but upmost to the number of available processors.
115      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
116      if (size == AVAIL_PROCESSORS) {
117        LOG.warn("Use full core processors to scan dir, size={}", size);
118      }
119      return size;
120    } else if (poolSize.matches("0.[0-9]+|1.0")) {
121      // if poolSize is a double, return poolSize * availableProcessors;
122      // Ensure that we always return at least one.
123      int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
124      if (computedThreads < 1) {
125        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
126        return 1;
127      }
128      return computedThreads;
129    } else {
130      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
131          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
132      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
133    }
134  }
135
136  /**
137   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
138   * will go through the cleaner delegates, but otherwise the file is just deleted.
139   * @param file full {@link Path} of the file to be checked
140   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
141   */
142  protected abstract boolean validate(Path file);
143
144  /**
145   * Instantiate and initialize all the file cleaners set in the configuration
146   * @param confKey key to get the file cleaner classes from the configuration
147   */
148  private void initCleanerChain(String confKey) {
149    this.cleanersChain = new LinkedList<>();
150    String[] logCleaners = conf.getStrings(confKey);
151    if (logCleaners != null) {
152      for (String className: logCleaners) {
153        className = className.trim();
154        if (className.isEmpty()) {
155          continue;
156        }
157        T logCleaner = newFileCleaner(className, conf);
158        if (logCleaner != null) {
159          LOG.info("Initialize cleaner={}", className);
160          this.cleanersChain.add(logCleaner);
161        }
162      }
163    }
164  }
165
166  /**
167   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
168   * LogCleanerDelegate.
169   * @param className fully qualified class name of the LogCleanerDelegate
170   * @param conf used configuration
171   * @return the new instance
172   */
173  private T newFileCleaner(String className, Configuration conf) {
174    try {
175      Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
176        FileCleanerDelegate.class);
177      @SuppressWarnings("unchecked")
178      T cleaner = (T) c.getDeclaredConstructor().newInstance();
179      cleaner.setConf(conf);
180      cleaner.init(this.params);
181      return cleaner;
182    } catch (Exception e) {
183      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
184      // skipping if can't instantiate
185      return null;
186    }
187  }
188
189  @Override
190  protected void chore() {
191    if (getEnabled()) {
192      try {
193        pool.latchCountUp();
194        if (runCleaner()) {
195          LOG.trace("Cleaned all WALs under {}", oldFileDir);
196        } else {
197          LOG.trace("WALs outstanding under {}", oldFileDir);
198        }
199      } finally {
200        pool.latchCountDown();
201      }
202      // After each cleaner chore, checks if received reconfigure notification while cleaning.
203      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
204      // This cleaner is waiting for other cleaners finishing their jobs.
205      // To avoid missing next chore, only wait 0.8 * period, then shutdown.
206      pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
207    } else {
208      LOG.trace("Cleaner chore disabled! Not cleaning.");
209    }
210  }
211
212  private void preRunCleaner() {
213    cleanersChain.forEach(FileCleanerDelegate::preClean);
214  }
215
216  public boolean runCleaner() {
217    preRunCleaner();
218    try {
219      CompletableFuture<Boolean> future = new CompletableFuture<>();
220      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
221      return future.get();
222    } catch (Exception e) {
223      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
224      return false;
225    }
226  }
227
228  /**
229   * Sort the given list in (descending) order of the space each element takes
230   * @param dirs the list to sort, element in it should be directory (not file)
231   */
232  private void sortByConsumedSpace(List<FileStatus> dirs) {
233    if (dirs == null || dirs.size() < 2) {
234      // no need to sort for empty or single directory
235      return;
236    }
237    dirs.sort(new Comparator<FileStatus>() {
238      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
239
240      @Override
241      public int compare(FileStatus f1, FileStatus f2) {
242        long f1ConsumedSpace = getSpace(f1);
243        long f2ConsumedSpace = getSpace(f2);
244        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
245      }
246
247      private long getSpace(FileStatus f) {
248        Long cached = directorySpaces.get(f);
249        if (cached != null) {
250          return cached;
251        }
252        try {
253          long space =
254              f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
255          directorySpaces.put(f, space);
256          return space;
257        } catch (IOException e) {
258          LOG.trace("Failed to get space consumed by path={}", f, e);
259          return -1;
260        }
261      }
262    });
263  }
264
265  /**
266   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
267   * necessary.
268   * @param files List of FileStatus for the files to check (and possibly delete)
269   * @return true iff successfully deleted all files
270   */
271  private boolean checkAndDeleteFiles(List<FileStatus> files) {
272    if (files == null) {
273      return true;
274    }
275
276    // first check to see if the path is valid
277    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
278    List<FileStatus> invalidFiles = Lists.newArrayList();
279    for (FileStatus file : files) {
280      if (validate(file.getPath())) {
281        validFiles.add(file);
282      } else {
283        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
284        invalidFiles.add(file);
285      }
286    }
287
288    Iterable<FileStatus> deletableValidFiles = validFiles;
289    // check each of the cleaners for the valid files
290    for (T cleaner : cleanersChain) {
291      if (cleaner.isStopped() || this.getStopper().isStopped()) {
292        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
293            + this.oldFileDir);
294        return false;
295      }
296
297      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
298
299      // trace which cleaner is holding on to each file
300      if (LOG.isTraceEnabled()) {
301        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
302        for (FileStatus file : deletableValidFiles) {
303          if (!filteredFileSet.contains(file)) {
304            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
305          }
306        }
307      }
308
309      deletableValidFiles = filteredFiles;
310    }
311
312    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
313    return deleteFiles(filesToDelete) == files.size();
314  }
315
316  /**
317   * Check if a empty directory with no subdirs or subfiles can be deleted
318   * @param dir Path of the directory
319   * @return True if the directory can be deleted, otherwise false
320   */
321  private boolean isEmptyDirDeletable(Path dir) {
322    for (T cleaner : cleanersChain) {
323      if (cleaner.isStopped() || this.getStopper().isStopped()) {
324        LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}",
325          this.getName(), dir);
326        return false;
327      }
328      if (!cleaner.isEmptyDirDeletable(dir)) {
329        // If one of the cleaner need the empty directory, skip delete it
330        return false;
331      }
332    }
333    return true;
334  }
335
336  /**
337   * Delete the given files
338   * @param filesToDelete files to delete
339   * @return number of deleted files
340   */
341  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
342    int deletedFileCount = 0;
343    for (FileStatus file : filesToDelete) {
344      Path filePath = file.getPath();
345      LOG.trace("Removing {} from archive", filePath);
346      try {
347        boolean success = this.fs.delete(filePath, false);
348        if (success) {
349          deletedFileCount++;
350        } else {
351          LOG.warn("Attempted to delete:" + filePath
352              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
353        }
354      } catch (IOException e) {
355        e = e instanceof RemoteException ?
356                  ((RemoteException)e).unwrapRemoteException() : e;
357        LOG.warn("Error while deleting: " + filePath, e);
358      }
359    }
360    return deletedFileCount;
361  }
362
363  @Override
364  public synchronized void cleanup() {
365    for (T lc : this.cleanersChain) {
366      try {
367        lc.stop("Exiting");
368      } catch (Throwable t) {
369        LOG.warn("Stopping", t);
370      }
371    }
372  }
373
374  int getChorePoolSize() {
375    return pool.getSize();
376  }
377
378  /**
379   * @param enabled
380   */
381  public boolean setEnabled(final boolean enabled) {
382    return this.enabled.getAndSet(enabled);
383  }
384
385  public boolean getEnabled() { return this.enabled.get();
386  }
387
388  private interface Action<T> {
389    T act() throws Exception;
390  }
391
392  /**
393   * Attempts to clean up a directory(its subdirectories, and files) in a
394   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
395   * calling result.get().
396   */
397  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
398    try {
399      // Step.1: List all files under the given directory.
400      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
401      List<FileStatus> subDirs =
402          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
403      List<FileStatus> files =
404          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
405
406      // Step.2: Try to delete all the deletable files.
407      boolean allFilesDeleted =
408          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
409
410      // Step.3: Start to traverse and delete the sub-directories.
411      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
412      if (!subDirs.isEmpty()) {
413        sortByConsumedSpace(subDirs);
414        // Submit the request of sub-directory deletion.
415        subDirs.forEach(subDir -> {
416          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
417          pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
418          futures.add(subFuture);
419        });
420      }
421
422      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
423      // current directory asynchronously.
424      FutureUtils.addListener(
425        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
426        (voidObj, e) -> {
427          if (e != null) {
428            result.completeExceptionally(e);
429            return;
430          }
431          try {
432            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
433            boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
434            if (deleted && !root) {
435              // If and only if files and sub-dirs under current dir are deleted successfully, and
436              // the empty directory can be deleted, and it is not the root dir then task will
437              // try to delete it.
438              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
439            }
440            result.complete(deleted);
441          } catch (Exception ie) {
442            // Must handle the inner exception here, otherwise the result may get stuck if one
443            // sub-directory get some failure.
444            result.completeExceptionally(ie);
445          }
446        });
447    } catch (Exception e) {
448      LOG.debug("Failed to traverse and delete the path: {}", dir, e);
449      result.completeExceptionally(e);
450    }
451  }
452
453  /**
454   * Perform a delete on a specified type.
455   * @param deletion a delete
456   * @param type possible values are 'files', 'subdirs', 'dirs'
457   * @return true if it deleted successfully, false otherwise
458   */
459  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
460    boolean deleted;
461    try {
462      LOG.trace("Start deleting {} under {}", type, dir);
463      deleted = deletion.act();
464    } catch (PathIsNotEmptyDirectoryException exception) {
465      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
466      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
467      // message below.
468      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
469      deleted = false;
470    } catch (IOException ioe) {
471      LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
472          + "happening, use following exception when asking on mailing list.",
473        type, dir, ioe);
474      deleted = false;
475    } catch (Exception e) {
476      LOG.info("unexpected exception: ", e);
477      deleted = false;
478    }
479    LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
480    return deleted;
481  }
482}