001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ForkJoinPool;
028import java.util.concurrent.RecursiveTask;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
035import org.apache.hadoop.hbase.ScheduledChore;
036import org.apache.hadoop.hbase.Stoppable;
037import org.apache.hadoop.hbase.conf.ConfigurationObserver;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
047import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049
050/**
051 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
052 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
053 */
054@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
055    justification="TODO: Fix. It is wonky have static pool initialized from instance")
056@InterfaceAudience.Private
057public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
058    implements ConfigurationObserver {
059
060  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
061  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
062
063  /**
064   * If it is an integer and >= 1, it would be the size;
065   * if 0.0 < size <= 1.0, size would be available processors * size.
066   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
067   * while latter will use only 1 thread for chore to scan dir.
068   */
069  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
070  private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
071
072  // It may be waste resources for each cleaner chore own its pool,
073  // so let's make pool for all cleaner chores.
074  private static volatile ForkJoinPool CHOREPOOL;
075  private static volatile int CHOREPOOLSIZE;
076
077  protected final FileSystem fs;
078  private final Path oldFileDir;
079  private final Configuration conf;
080  protected final Map<String, Object> params;
081  private final AtomicBoolean enabled = new AtomicBoolean(true);
082  private final AtomicBoolean reconfig = new AtomicBoolean(false);
083  protected List<T> cleanersChain;
084
085  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
086                      FileSystem fs, Path oldFileDir, String confKey) {
087    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
088  }
089
090  /**
091   * @param name name of the chore being run
092   * @param sleepPeriod the period of time to sleep between each run
093   * @param s the stopper
094   * @param conf configuration to use
095   * @param fs handle to the FS
096   * @param oldFileDir the path to the archived files
097   * @param confKey configuration key for the classes to instantiate
098   * @param params members could be used in cleaner
099   */
100  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
101      FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
102    super(name, s, sleepPeriod);
103    this.fs = fs;
104    this.oldFileDir = oldFileDir;
105    this.conf = conf;
106    this.params = params;
107    initCleanerChain(confKey);
108
109    if (CHOREPOOL == null) {
110      String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
111      CHOREPOOLSIZE = calculatePoolSize(poolSize);
112      // poolSize may be 0 or 0.0 from a careless configuration,
113      // double check to make sure.
114      CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE;
115      this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE);
116      LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE);
117    }
118  }
119
120  /**
121   * Calculate size for cleaner pool.
122   * @param poolSize size from configuration
123   * @return size of pool after calculation
124   */
125  static int calculatePoolSize(String poolSize) {
126    if (poolSize.matches("[1-9][0-9]*")) {
127      // If poolSize is an integer, return it directly,
128      // but upmost to the number of available processors.
129      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
130      if (size == AVAIL_PROCESSORS) {
131        LOG.warn("Use full core processors to scan dir, size={}", size);
132      }
133      return size;
134    } else if (poolSize.matches("0.[0-9]+|1.0")) {
135      // if poolSize is a double, return poolSize * availableProcessors;
136      // Ensure that we always return at least one.
137      int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
138      if (computedThreads < 1) {
139        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
140        return 1;
141      }
142      return computedThreads;
143    } else {
144      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
145          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
146      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
147    }
148  }
149
150  /**
151   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
152   * will go through the cleaner delegates, but otherwise the file is just deleted.
153   * @param file full {@link Path} of the file to be checked
154   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
155   */
156  protected abstract boolean validate(Path file);
157
158  /**
159   * Instantiate and initialize all the file cleaners set in the configuration
160   * @param confKey key to get the file cleaner classes from the configuration
161   */
162  private void initCleanerChain(String confKey) {
163    this.cleanersChain = new LinkedList<>();
164    String[] logCleaners = conf.getStrings(confKey);
165    if (logCleaners != null) {
166      for (String className : logCleaners) {
167        T logCleaner = newFileCleaner(className, conf);
168        if (logCleaner != null) {
169          LOG.debug("Initialize cleaner={}", className);
170          this.cleanersChain.add(logCleaner);
171        }
172      }
173    }
174  }
175
176  @Override
177  public void onConfigurationChange(Configuration conf) {
178    int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
179    if (updatedSize == CHOREPOOLSIZE) {
180      LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize);
181      return;
182    }
183    CHOREPOOLSIZE = updatedSize;
184    if (CHOREPOOL.getPoolSize() == 0) {
185      // Chore does not work now, update it directly.
186      updateChorePoolSize(updatedSize);
187      return;
188    }
189    // Chore is working, update it after chore finished.
190    reconfig.set(true);
191  }
192
193  private void updateChorePoolSize(int updatedSize) {
194    CHOREPOOL.shutdownNow();
195    LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize);
196    CHOREPOOL = new ForkJoinPool(updatedSize);
197  }
198
199  /**
200   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
201   * LogCleanerDelegate.
202   * @param className fully qualified class name of the LogCleanerDelegate
203   * @param conf used configuration
204   * @return the new instance
205   */
206  private T newFileCleaner(String className, Configuration conf) {
207    try {
208      Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
209        FileCleanerDelegate.class);
210      @SuppressWarnings("unchecked")
211      T cleaner = (T) c.getDeclaredConstructor().newInstance();
212      cleaner.setConf(conf);
213      cleaner.init(this.params);
214      return cleaner;
215    } catch (Exception e) {
216      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
217      // skipping if can't instantiate
218      return null;
219    }
220  }
221
222  @Override
223  protected void chore() {
224    if (getEnabled()) {
225      if (runCleaner()) {
226        LOG.trace("Cleaned all WALs under {}", oldFileDir);
227      } else {
228        LOG.trace("WALs outstanding under {}", oldFileDir);
229      }
230      // After each clean chore, checks if receives reconfigure notification while cleaning
231      if (reconfig.compareAndSet(true, false)) {
232        updateChorePoolSize(CHOREPOOLSIZE);
233      }
234    } else {
235      LOG.trace("Cleaner chore disabled! Not cleaning.");
236    }
237  }
238
239  private void preRunCleaner() {
240    cleanersChain.forEach(FileCleanerDelegate::preClean);
241  }
242
243  public Boolean runCleaner() {
244    preRunCleaner();
245    CleanerTask task = new CleanerTask(this.oldFileDir, true);
246    CHOREPOOL.submit(task);
247    return task.join();
248  }
249
250  /**
251   * Sort the given list in (descending) order of the space each element takes
252   * @param dirs the list to sort, element in it should be directory (not file)
253   */
254  private void sortByConsumedSpace(List<FileStatus> dirs) {
255    if (dirs == null || dirs.size() < 2) {
256      // no need to sort for empty or single directory
257      return;
258    }
259    dirs.sort(new Comparator<FileStatus>() {
260      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
261
262      @Override
263      public int compare(FileStatus f1, FileStatus f2) {
264        long f1ConsumedSpace = getSpace(f1);
265        long f2ConsumedSpace = getSpace(f2);
266        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
267      }
268
269      private long getSpace(FileStatus f) {
270        Long cached = directorySpaces.get(f);
271        if (cached != null) {
272          return cached;
273        }
274        try {
275          long space =
276              f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
277          directorySpaces.put(f, space);
278          return space;
279        } catch (IOException e) {
280          LOG.trace("Failed to get space consumed by path={}", f, e);
281          return -1;
282        }
283      }
284    });
285  }
286
287  /**
288   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
289   * necessary.
290   * @param files List of FileStatus for the files to check (and possibly delete)
291   * @return true iff successfully deleted all files
292   */
293  private boolean checkAndDeleteFiles(List<FileStatus> files) {
294    if (files == null) {
295      return true;
296    }
297
298    // first check to see if the path is valid
299    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
300    List<FileStatus> invalidFiles = Lists.newArrayList();
301    for (FileStatus file : files) {
302      if (validate(file.getPath())) {
303        validFiles.add(file);
304      } else {
305        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
306        invalidFiles.add(file);
307      }
308    }
309
310    Iterable<FileStatus> deletableValidFiles = validFiles;
311    // check each of the cleaners for the valid files
312    for (T cleaner : cleanersChain) {
313      if (cleaner.isStopped() || this.getStopper().isStopped()) {
314        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
315            + this.oldFileDir);
316        return false;
317      }
318
319      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
320
321      // trace which cleaner is holding on to each file
322      if (LOG.isTraceEnabled()) {
323        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
324        for (FileStatus file : deletableValidFiles) {
325          if (!filteredFileSet.contains(file)) {
326            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
327          }
328        }
329      }
330
331      deletableValidFiles = filteredFiles;
332    }
333
334    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
335    return deleteFiles(filesToDelete) == files.size();
336  }
337
338  /**
339   * Delete the given files
340   * @param filesToDelete files to delete
341   * @return number of deleted files
342   */
343  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
344    int deletedFileCount = 0;
345    for (FileStatus file : filesToDelete) {
346      Path filePath = file.getPath();
347      LOG.trace("Removing {} from archive", filePath);
348      try {
349        boolean success = this.fs.delete(filePath, false);
350        if (success) {
351          deletedFileCount++;
352        } else {
353          LOG.warn("Attempted to delete:" + filePath
354              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
355        }
356      } catch (IOException e) {
357        e = e instanceof RemoteException ?
358                  ((RemoteException)e).unwrapRemoteException() : e;
359        LOG.warn("Error while deleting: " + filePath, e);
360      }
361    }
362    return deletedFileCount;
363  }
364
365  @Override
366  public synchronized void cleanup() {
367    for (T lc : this.cleanersChain) {
368      try {
369        lc.stop("Exiting");
370      } catch (Throwable t) {
371        LOG.warn("Stopping", t);
372      }
373    }
374  }
375
376  @VisibleForTesting
377  int getChorePoolSize() {
378    return CHOREPOOLSIZE;
379  }
380
381  /**
382   * @param enabled
383   */
384  public boolean setEnabled(final boolean enabled) {
385    return this.enabled.getAndSet(enabled);
386  }
387
388  public boolean getEnabled() { return this.enabled.get();
389  }
390
391  private interface Action<T> {
392    T act() throws IOException;
393  }
394
395  /**
396   * Attemps to clean up a directory, its subdirectories, and files.
397   * Return value is true if everything was deleted. false on partial / total failures.
398   */
399  private class CleanerTask extends RecursiveTask<Boolean> {
400    private final Path dir;
401    private final boolean root;
402
403    CleanerTask(final FileStatus dir, final boolean root) {
404      this(dir.getPath(), root);
405    }
406
407    CleanerTask(final Path dir, final boolean root) {
408      this.dir = dir;
409      this.root = root;
410    }
411
412    @Override
413    protected Boolean compute() {
414      LOG.trace("Cleaning under {}", dir);
415      List<FileStatus> subDirs;
416      List<FileStatus> files;
417      try {
418        // if dir doesn't exist, we'll get null back for both of these
419        // which will fall through to succeeding.
420        subDirs = getFilteredStatus(status -> status.isDirectory());
421        files = getFilteredStatus(status -> status.isFile());
422      } catch (IOException ioe) {
423        LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
424        return false;
425      }
426
427      boolean nullSubDirs = subDirs == null;
428      if (nullSubDirs) {
429        LOG.trace("There is no subdir under {}", dir);
430      }
431      if (files == null) {
432        LOG.trace("There is no file under {}", dir);
433      }
434
435      int capacity = nullSubDirs ? 0 : subDirs.size();
436      List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity);
437      if (!nullSubDirs) {
438        sortByConsumedSpace(subDirs);
439        for (FileStatus subdir : subDirs) {
440          CleanerTask task = new CleanerTask(subdir, false);
441          tasks.add(task);
442          task.fork();
443        }
444      }
445
446      boolean result = true;
447      result &= deleteAction(() -> checkAndDeleteFiles(files), "files");
448      result &= deleteAction(() -> getCleanResult(tasks), "subdirs");
449      // if and only if files and subdirs under current dir are deleted successfully, and
450      // it is not the root dir, then task will try to delete it.
451      if (result && !root) {
452        result &= deleteAction(() -> fs.delete(dir, false), "dir");
453      }
454      return result;
455    }
456
457    /**
458     * Get FileStatus with filter.
459     * Pay attention that FSUtils #listStatusWithStatusFilter would return null,
460     * even though status is empty but not null.
461     * @param function a filter function
462     * @return filtered FileStatus or null if dir doesn't exist
463     * @throws IOException if there's an error other than dir not existing
464     */
465    private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
466      return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status));
467    }
468
469    /**
470     * Perform a delete on a specified type.
471     * @param deletion a delete
472     * @param type possible values are 'files', 'subdirs', 'dirs'
473     * @return true if it deleted successfully, false otherwise
474     */
475    private boolean deleteAction(Action<Boolean> deletion, String type) {
476      boolean deleted;
477      try {
478        LOG.trace("Start deleting {} under {}", type, dir);
479        deleted = deletion.act();
480      } catch (PathIsNotEmptyDirectoryException exception) {
481        // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
482        // LocalFileSystem throws a bare IOException. So some test code will get the verbose
483        // message below.
484        LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
485            "exception details at TRACE.", dir);
486        LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
487        deleted = false;
488      } catch (IOException ioe) {
489        LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
490                  "happening, use following exception when asking on mailing list.",
491                  type, dir, ioe);
492        deleted = false;
493      }
494      LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
495      return deleted;
496    }
497
498    /**
499     * Get cleaner results of subdirs.
500     * @param tasks subdirs cleaner tasks
501     * @return true if all subdirs deleted successfully, false for patial/all failures
502     * @throws IOException something happen during computation
503     */
504    private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
505      boolean cleaned = true;
506      try {
507        for (CleanerTask task : tasks) {
508          cleaned &= task.get();
509        }
510      } catch (InterruptedException | ExecutionException e) {
511        throw new IOException(e);
512      }
513      return cleaned;
514    }
515  }
516}