001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.stream.Collectors;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.util.FutureUtils;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
053 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
054 */
055@InterfaceAudience.Private
056public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
057
058  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
059  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
060
061  /**
062   * If it is an integer and >= 1, it would be the size;
063   * if 0.0 < size <= 1.0, size would be available processors * size.
064   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
065   * while latter will use only 1 thread for chore to scan dir.
066   */
067  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
068  static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
069
070  private final DirScanPool pool;
071
072  protected final FileSystem fs;
073  private final Path oldFileDir;
074  private final Configuration conf;
075  protected final Map<String, Object> params;
076  private final AtomicBoolean enabled = new AtomicBoolean(true);
077  protected List<T> cleanersChain;
078
079  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
080    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
081    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
082  }
083
084  /**
085   * @param name name of the chore being run
086   * @param sleepPeriod the period of time to sleep between each run
087   * @param s the stopper
088   * @param conf configuration to use
089   * @param fs handle to the FS
090   * @param oldFileDir the path to the archived files
091   * @param confKey configuration key for the classes to instantiate
092   * @param pool the thread pool used to scan directories
093   * @param params members could be used in cleaner
094   */
095  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
096    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
097    super(name, s, sleepPeriod);
098
099    Preconditions.checkNotNull(pool, "Chore's pool can not be null");
100    this.pool = pool;
101    this.fs = fs;
102    this.oldFileDir = oldFileDir;
103    this.conf = conf;
104    this.params = params;
105    initCleanerChain(confKey);
106  }
107
108  /**
109   * Calculate size for cleaner pool.
110   * @param poolSize size from configuration
111   * @return size of pool after calculation
112   */
113  static int calculatePoolSize(String poolSize) {
114    if (poolSize.matches("[1-9][0-9]*")) {
115      // If poolSize is an integer, return it directly,
116      // but upmost to the number of available processors.
117      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
118      if (size == AVAIL_PROCESSORS) {
119        LOG.warn("Use full core processors to scan dir, size={}", size);
120      }
121      return size;
122    } else if (poolSize.matches("0.[0-9]+|1.0")) {
123      // if poolSize is a double, return poolSize * availableProcessors;
124      // Ensure that we always return at least one.
125      int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
126      if (computedThreads < 1) {
127        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
128        return 1;
129      }
130      return computedThreads;
131    } else {
132      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
133          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
134      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
135    }
136  }
137
138  /**
139   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
140   * will go through the cleaner delegates, but otherwise the file is just deleted.
141   * @param file full {@link Path} of the file to be checked
142   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
143   */
144  protected abstract boolean validate(Path file);
145
146  /**
147   * Instantiate and initialize all the file cleaners set in the configuration
148   * @param confKey key to get the file cleaner classes from the configuration
149   */
150  private void initCleanerChain(String confKey) {
151    this.cleanersChain = new LinkedList<>();
152    String[] logCleaners = conf.getStrings(confKey);
153    if (logCleaners != null) {
154      for (String className : logCleaners) {
155        T logCleaner = newFileCleaner(className, conf);
156        if (logCleaner != null) {
157          LOG.info("Initialize cleaner={}", className);
158          this.cleanersChain.add(logCleaner);
159        }
160      }
161    }
162  }
163
164  /**
165   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
166   * LogCleanerDelegate.
167   * @param className fully qualified class name of the LogCleanerDelegate
168   * @param conf used configuration
169   * @return the new instance
170   */
171  private T newFileCleaner(String className, Configuration conf) {
172    try {
173      Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
174        FileCleanerDelegate.class);
175      @SuppressWarnings("unchecked")
176      T cleaner = (T) c.getDeclaredConstructor().newInstance();
177      cleaner.setConf(conf);
178      cleaner.init(this.params);
179      return cleaner;
180    } catch (Exception e) {
181      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
182      // skipping if can't instantiate
183      return null;
184    }
185  }
186
187  @Override
188  protected void chore() {
189    if (getEnabled()) {
190      try {
191        pool.latchCountUp();
192        if (runCleaner()) {
193          LOG.trace("Cleaned all WALs under {}", oldFileDir);
194        } else {
195          LOG.trace("WALs outstanding under {}", oldFileDir);
196        }
197      } finally {
198        pool.latchCountDown();
199      }
200      // After each cleaner chore, checks if received reconfigure notification while cleaning.
201      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
202      // This cleaner is waiting for other cleaners finishing their jobs.
203      // To avoid missing next chore, only wait 0.8 * period, then shutdown.
204      pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
205    } else {
206      LOG.trace("Cleaner chore disabled! Not cleaning.");
207    }
208  }
209
210  private void preRunCleaner() {
211    cleanersChain.forEach(FileCleanerDelegate::preClean);
212  }
213
214  public boolean runCleaner() {
215    preRunCleaner();
216    try {
217      CompletableFuture<Boolean> future = new CompletableFuture<>();
218      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
219      return future.get();
220    } catch (Exception e) {
221      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
222      return false;
223    }
224  }
225
226  /**
227   * Sort the given list in (descending) order of the space each element takes
228   * @param dirs the list to sort, element in it should be directory (not file)
229   */
230  private void sortByConsumedSpace(List<FileStatus> dirs) {
231    if (dirs == null || dirs.size() < 2) {
232      // no need to sort for empty or single directory
233      return;
234    }
235    dirs.sort(new Comparator<FileStatus>() {
236      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
237
238      @Override
239      public int compare(FileStatus f1, FileStatus f2) {
240        long f1ConsumedSpace = getSpace(f1);
241        long f2ConsumedSpace = getSpace(f2);
242        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
243      }
244
245      private long getSpace(FileStatus f) {
246        Long cached = directorySpaces.get(f);
247        if (cached != null) {
248          return cached;
249        }
250        try {
251          long space =
252              f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
253          directorySpaces.put(f, space);
254          return space;
255        } catch (IOException e) {
256          LOG.trace("Failed to get space consumed by path={}", f, e);
257          return -1;
258        }
259      }
260    });
261  }
262
263  /**
264   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
265   * necessary.
266   * @param files List of FileStatus for the files to check (and possibly delete)
267   * @return true iff successfully deleted all files
268   */
269  private boolean checkAndDeleteFiles(List<FileStatus> files) {
270    if (files == null) {
271      return true;
272    }
273
274    // first check to see if the path is valid
275    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
276    List<FileStatus> invalidFiles = Lists.newArrayList();
277    for (FileStatus file : files) {
278      if (validate(file.getPath())) {
279        validFiles.add(file);
280      } else {
281        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
282        invalidFiles.add(file);
283      }
284    }
285
286    Iterable<FileStatus> deletableValidFiles = validFiles;
287    // check each of the cleaners for the valid files
288    for (T cleaner : cleanersChain) {
289      if (cleaner.isStopped() || this.getStopper().isStopped()) {
290        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
291            + this.oldFileDir);
292        return false;
293      }
294
295      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
296
297      // trace which cleaner is holding on to each file
298      if (LOG.isTraceEnabled()) {
299        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
300        for (FileStatus file : deletableValidFiles) {
301          if (!filteredFileSet.contains(file)) {
302            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
303          }
304        }
305      }
306
307      deletableValidFiles = filteredFiles;
308    }
309
310    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
311    return deleteFiles(filesToDelete) == files.size();
312  }
313
314  /**
315   * Delete the given files
316   * @param filesToDelete files to delete
317   * @return number of deleted files
318   */
319  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
320    int deletedFileCount = 0;
321    for (FileStatus file : filesToDelete) {
322      Path filePath = file.getPath();
323      LOG.trace("Removing {} from archive", filePath);
324      try {
325        boolean success = this.fs.delete(filePath, false);
326        if (success) {
327          deletedFileCount++;
328        } else {
329          LOG.warn("Attempted to delete:" + filePath
330              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
331        }
332      } catch (IOException e) {
333        e = e instanceof RemoteException ?
334                  ((RemoteException)e).unwrapRemoteException() : e;
335        LOG.warn("Error while deleting: " + filePath, e);
336      }
337    }
338    return deletedFileCount;
339  }
340
341  @Override
342  public synchronized void cleanup() {
343    for (T lc : this.cleanersChain) {
344      try {
345        lc.stop("Exiting");
346      } catch (Throwable t) {
347        LOG.warn("Stopping", t);
348      }
349    }
350  }
351
352  @VisibleForTesting
353  int getChorePoolSize() {
354    return pool.getSize();
355  }
356
357  /**
358   * @param enabled
359   */
360  public boolean setEnabled(final boolean enabled) {
361    return this.enabled.getAndSet(enabled);
362  }
363
364  public boolean getEnabled() { return this.enabled.get();
365  }
366
367  private interface Action<T> {
368    T act() throws Exception;
369  }
370
371  /**
372   * Attempts to clean up a directory(its subdirectories, and files) in a
373   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
374   * calling result.get().
375   */
376  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
377    try {
378      // Step.1: List all files under the given directory.
379      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
380      List<FileStatus> subDirs =
381          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
382      List<FileStatus> files =
383          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
384
385      // Step.2: Try to delete all the deletable files.
386      boolean allFilesDeleted =
387          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
388
389      // Step.3: Start to traverse and delete the sub-directories.
390      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
391      if (!subDirs.isEmpty()) {
392        sortByConsumedSpace(subDirs);
393        // Submit the request of sub-directory deletion.
394        subDirs.forEach(subDir -> {
395          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
396          pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
397          futures.add(subFuture);
398        });
399      }
400
401      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
402      // current directory asynchronously.
403      FutureUtils.addListener(
404        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
405        (voidObj, e) -> {
406          if (e != null) {
407            result.completeExceptionally(e);
408            return;
409          }
410          try {
411            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
412            boolean deleted = allFilesDeleted && allSubDirsDeleted;
413            if (deleted && !root) {
414              // If and only if files and sub-dirs under current dir are deleted successfully, and
415              // the empty directory can be deleted, and it is not the root dir then task will
416              // try to delete it.
417              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
418            }
419            result.complete(deleted);
420          } catch (Exception ie) {
421            // Must handle the inner exception here, otherwise the result may get stuck if one
422            // sub-directory get some failure.
423            result.completeExceptionally(ie);
424          }
425        });
426    } catch (Exception e) {
427      LOG.debug("Failed to traverse and delete the path: {}", dir, e);
428      result.completeExceptionally(e);
429    }
430  }
431
432  /**
433   * Perform a delete on a specified type.
434   * @param deletion a delete
435   * @param type possible values are 'files', 'subdirs', 'dirs'
436   * @return true if it deleted successfully, false otherwise
437   */
438  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
439    boolean deleted;
440    try {
441      LOG.trace("Start deleting {} under {}", type, dir);
442      deleted = deletion.act();
443    } catch (PathIsNotEmptyDirectoryException exception) {
444      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
445      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
446      // message below.
447      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
448      deleted = false;
449    } catch (IOException ioe) {
450      LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
451          + "happening, use following exception when asking on mailing list.",
452        type, dir, ioe);
453      deleted = false;
454    } catch (Exception e) {
455      LOG.info("unexpected exception: ", e);
456      deleted = false;
457    }
458    LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
459    return deleted;
460  }
461}