001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Comparator;
023import java.util.HashMap;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ForkJoinPool;
030import java.util.concurrent.ForkJoinTask;
031import java.util.concurrent.RecursiveTask;
032import java.util.concurrent.atomic.AtomicBoolean;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
039import org.apache.hadoop.hbase.ScheduledChore;
040import org.apache.hadoop.hbase.Stoppable;
041import org.apache.hadoop.hbase.conf.ConfigurationObserver;
042import org.apache.hadoop.hbase.util.FSUtils;
043import org.apache.hadoop.ipc.RemoteException;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
050import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
052import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054
055/**
056 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
057 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
058 */
059@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
060    justification="Static pool will be only updated once.")
061@InterfaceAudience.Private
062public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
063    implements ConfigurationObserver {
064
065  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
066  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
067
068  /**
069   * If it is an integer and >= 1, it would be the size;
070   * if 0.0 < size <= 1.0, size would be available processors * size.
071   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
072   * while latter will use only 1 thread for chore to scan dir.
073   */
074  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
075  private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
076
077  private static class DirScanPool {
078    int size;
079    ForkJoinPool pool;
080    int cleanerLatch;
081    AtomicBoolean reconfigNotification;
082
083    DirScanPool(Configuration conf) {
084      String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
085      size = calculatePoolSize(poolSize);
086      // poolSize may be 0 or 0.0 from a careless configuration,
087      // double check to make sure.
088      size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
089      pool = new ForkJoinPool(size);
090      LOG.info("Cleaner pool size is {}", size);
091      reconfigNotification = new AtomicBoolean(false);
092      cleanerLatch = 0;
093    }
094
095    /**
096     * Checks if pool can be updated. If so, mark for update later.
097     * @param conf configuration
098     */
099    synchronized void markUpdate(Configuration conf) {
100      int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
101      if (newSize == size) {
102        LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
103        return;
104      }
105      size = newSize;
106      // Chore is working, update it later.
107      reconfigNotification.set(true);
108    }
109
110    /**
111     * Update pool with new size.
112     */
113    synchronized void updatePool(long timeout) {
114      long stopTime = System.currentTimeMillis() + timeout;
115      while (cleanerLatch != 0 && timeout > 0) {
116        try {
117          wait(timeout);
118          timeout = stopTime - System.currentTimeMillis();
119        } catch (InterruptedException ie) {
120          Thread.currentThread().interrupt();
121          break;
122        }
123      }
124      shutDownNow();
125      LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
126      pool = new ForkJoinPool(size);
127    }
128
129    synchronized void latchCountUp() {
130      cleanerLatch++;
131    }
132
133    synchronized void latchCountDown() {
134      cleanerLatch--;
135      notifyAll();
136    }
137
138    @SuppressWarnings("FutureReturnValueIgnored")
139    synchronized void submit(ForkJoinTask task) {
140      pool.submit(task);
141    }
142
143    synchronized void shutDownNow() {
144      if (pool == null || pool.isShutdown()) {
145        return;
146      }
147      pool.shutdownNow();
148    }
149  }
150  // It may be waste resources for each cleaner chore own its pool,
151  // so let's make pool for all cleaner chores.
152  private static volatile DirScanPool POOL;
153
154  protected final FileSystem fs;
155  private final Path oldFileDir;
156  private final Configuration conf;
157  protected final Map<String, Object> params;
158  private final AtomicBoolean enabled = new AtomicBoolean(true);
159  protected List<T> cleanersChain;
160
161  public static void initChorePool(Configuration conf) {
162    if (POOL == null) {
163      POOL = new DirScanPool(conf);
164    }
165  }
166
167  public static void shutDownChorePool() {
168    if (POOL != null) {
169      POOL.shutDownNow();
170      POOL = null;
171    }
172  }
173
174  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
175                      FileSystem fs, Path oldFileDir, String confKey) {
176    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
177  }
178
179  /**
180   * @param name name of the chore being run
181   * @param sleepPeriod the period of time to sleep between each run
182   * @param s the stopper
183   * @param conf configuration to use
184   * @param fs handle to the FS
185   * @param oldFileDir the path to the archived files
186   * @param confKey configuration key for the classes to instantiate
187   * @param params members could be used in cleaner
188   */
189  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
190      FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
191    super(name, s, sleepPeriod);
192
193    Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
194      + "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
195    this.fs = fs;
196    this.oldFileDir = oldFileDir;
197    this.conf = conf;
198    this.params = params;
199    initCleanerChain(confKey);
200  }
201
202  /**
203   * Calculate size for cleaner pool.
204   * @param poolSize size from configuration
205   * @return size of pool after calculation
206   */
207  static int calculatePoolSize(String poolSize) {
208    if (poolSize.matches("[1-9][0-9]*")) {
209      // If poolSize is an integer, return it directly,
210      // but upmost to the number of available processors.
211      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
212      if (size == AVAIL_PROCESSORS) {
213        LOG.warn("Use full core processors to scan dir, size={}", size);
214      }
215      return size;
216    } else if (poolSize.matches("0.[0-9]+|1.0")) {
217      // if poolSize is a double, return poolSize * availableProcessors;
218      // Ensure that we always return at least one.
219      int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
220      if (computedThreads < 1) {
221        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
222        return 1;
223      }
224      return computedThreads;
225    } else {
226      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
227          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
228      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
229    }
230  }
231
232  /**
233   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
234   * will go through the cleaner delegates, but otherwise the file is just deleted.
235   * @param file full {@link Path} of the file to be checked
236   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
237   */
238  protected abstract boolean validate(Path file);
239
240  /**
241   * Instantiate and initialize all the file cleaners set in the configuration
242   * @param confKey key to get the file cleaner classes from the configuration
243   */
244  private void initCleanerChain(String confKey) {
245    this.cleanersChain = new LinkedList<>();
246    String[] logCleaners = conf.getStrings(confKey);
247    if (logCleaners != null) {
248      for (String className : logCleaners) {
249        T logCleaner = newFileCleaner(className, conf);
250        if (logCleaner != null) {
251          LOG.debug("Initialize cleaner={}", className);
252          this.cleanersChain.add(logCleaner);
253        }
254      }
255    }
256  }
257
258  @Override
259  public void onConfigurationChange(Configuration conf) {
260    POOL.markUpdate(conf);
261  }
262
263  /**
264   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
265   * LogCleanerDelegate.
266   * @param className fully qualified class name of the LogCleanerDelegate
267   * @param conf used configuration
268   * @return the new instance
269   */
270  private T newFileCleaner(String className, Configuration conf) {
271    try {
272      Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
273        FileCleanerDelegate.class);
274      @SuppressWarnings("unchecked")
275      T cleaner = (T) c.getDeclaredConstructor().newInstance();
276      cleaner.setConf(conf);
277      cleaner.init(this.params);
278      return cleaner;
279    } catch (Exception e) {
280      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
281      // skipping if can't instantiate
282      return null;
283    }
284  }
285
286  @Override
287  protected void chore() {
288    if (getEnabled()) {
289      try {
290        POOL.latchCountUp();
291        if (runCleaner()) {
292          LOG.trace("Cleaned all WALs under {}", oldFileDir);
293        } else {
294          LOG.trace("WALs outstanding under {}", oldFileDir);
295        }
296      } finally {
297        POOL.latchCountDown();
298      }
299      // After each cleaner chore, checks if received reconfigure notification while cleaning.
300      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
301      if (POOL.reconfigNotification.compareAndSet(true, false)) {
302        // This cleaner is waiting for other cleaners finishing their jobs.
303        // To avoid missing next chore, only wait 0.8 * period, then shutdown.
304        POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
305      }
306    } else {
307      LOG.trace("Cleaner chore disabled! Not cleaning.");
308    }
309  }
310
311  private void preRunCleaner() {
312    cleanersChain.forEach(FileCleanerDelegate::preClean);
313  }
314
315  public Boolean runCleaner() {
316    preRunCleaner();
317    CleanerTask task = new CleanerTask(this.oldFileDir, true);
318    POOL.submit(task);
319    return task.join();
320  }
321
322  /**
323   * Sort the given list in (descending) order of the space each element takes
324   * @param dirs the list to sort, element in it should be directory (not file)
325   */
326  private void sortByConsumedSpace(List<FileStatus> dirs) {
327    if (dirs == null || dirs.size() < 2) {
328      // no need to sort for empty or single directory
329      return;
330    }
331    dirs.sort(new Comparator<FileStatus>() {
332      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
333
334      @Override
335      public int compare(FileStatus f1, FileStatus f2) {
336        long f1ConsumedSpace = getSpace(f1);
337        long f2ConsumedSpace = getSpace(f2);
338        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
339      }
340
341      private long getSpace(FileStatus f) {
342        Long cached = directorySpaces.get(f);
343        if (cached != null) {
344          return cached;
345        }
346        try {
347          long space =
348              f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
349          directorySpaces.put(f, space);
350          return space;
351        } catch (IOException e) {
352          LOG.trace("Failed to get space consumed by path={}", f, e);
353          return -1;
354        }
355      }
356    });
357  }
358
359  /**
360   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
361   * necessary.
362   * @param files List of FileStatus for the files to check (and possibly delete)
363   * @return true iff successfully deleted all files
364   */
365  private boolean checkAndDeleteFiles(List<FileStatus> files) {
366    if (files == null) {
367      return true;
368    }
369
370    // first check to see if the path is valid
371    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
372    List<FileStatus> invalidFiles = Lists.newArrayList();
373    for (FileStatus file : files) {
374      if (validate(file.getPath())) {
375        validFiles.add(file);
376      } else {
377        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
378        invalidFiles.add(file);
379      }
380    }
381
382    Iterable<FileStatus> deletableValidFiles = validFiles;
383    // check each of the cleaners for the valid files
384    for (T cleaner : cleanersChain) {
385      if (cleaner.isStopped() || this.getStopper().isStopped()) {
386        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
387            + this.oldFileDir);
388        return false;
389      }
390
391      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
392
393      // trace which cleaner is holding on to each file
394      if (LOG.isTraceEnabled()) {
395        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
396        for (FileStatus file : deletableValidFiles) {
397          if (!filteredFileSet.contains(file)) {
398            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
399          }
400        }
401      }
402
403      deletableValidFiles = filteredFiles;
404    }
405
406    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
407    return deleteFiles(filesToDelete) == files.size();
408  }
409
410  /**
411   * Delete the given files
412   * @param filesToDelete files to delete
413   * @return number of deleted files
414   */
415  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
416    int deletedFileCount = 0;
417    for (FileStatus file : filesToDelete) {
418      Path filePath = file.getPath();
419      LOG.trace("Removing {} from archive", filePath);
420      try {
421        boolean success = this.fs.delete(filePath, false);
422        if (success) {
423          deletedFileCount++;
424        } else {
425          LOG.warn("Attempted to delete:" + filePath
426              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
427        }
428      } catch (IOException e) {
429        e = e instanceof RemoteException ?
430                  ((RemoteException)e).unwrapRemoteException() : e;
431        LOG.warn("Error while deleting: " + filePath, e);
432      }
433    }
434    return deletedFileCount;
435  }
436
437  @Override
438  public synchronized void cleanup() {
439    for (T lc : this.cleanersChain) {
440      try {
441        lc.stop("Exiting");
442      } catch (Throwable t) {
443        LOG.warn("Stopping", t);
444      }
445    }
446  }
447
448  @VisibleForTesting
449  int getChorePoolSize() {
450    return POOL.size;
451  }
452
453  /**
454   * @param enabled
455   */
456  public boolean setEnabled(final boolean enabled) {
457    return this.enabled.getAndSet(enabled);
458  }
459
460  public boolean getEnabled() { return this.enabled.get();
461  }
462
463  private interface Action<T> {
464    T act() throws IOException;
465  }
466
467  /**
468   * Attemps to clean up a directory, its subdirectories, and files.
469   * Return value is true if everything was deleted. false on partial / total failures.
470   */
471  private class CleanerTask extends RecursiveTask<Boolean> {
472    private final Path dir;
473    private final boolean root;
474
475    CleanerTask(final FileStatus dir, final boolean root) {
476      this(dir.getPath(), root);
477    }
478
479    CleanerTask(final Path dir, final boolean root) {
480      this.dir = dir;
481      this.root = root;
482    }
483
484    @Override
485    protected Boolean compute() {
486      LOG.trace("Cleaning under {}", dir);
487      List<FileStatus> subDirs;
488      List<FileStatus> files;
489      try {
490        // if dir doesn't exist, we'll get null back for both of these
491        // which will fall through to succeeding.
492        subDirs = getFilteredStatus(FileStatus::isDirectory);
493        files = getFilteredStatus(FileStatus::isFile);
494      } catch (IOException ioe) {
495        LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
496        return false;
497      }
498
499      boolean allFilesDeleted = true;
500      if (!files.isEmpty()) {
501        allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
502      }
503
504      boolean allSubdirsDeleted = true;
505      if (!subDirs.isEmpty()) {
506        List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
507        sortByConsumedSpace(subDirs);
508        for (FileStatus subdir : subDirs) {
509          CleanerTask task = new CleanerTask(subdir, false);
510          tasks.add(task);
511          task.fork();
512        }
513        allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
514      }
515
516      boolean result = allFilesDeleted && allSubdirsDeleted;
517      // if and only if files and subdirs under current dir are deleted successfully, and
518      // it is not the root dir, then task will try to delete it.
519      if (result && !root) {
520        result &= deleteAction(() -> fs.delete(dir, false), "dir");
521      }
522      return result;
523    }
524
525    /**
526     * Get FileStatus with filter.
527     * @param function a filter function
528     * @return filtered FileStatus or empty list if dir doesn't exist
529     * @throws IOException if there's an error other than dir not existing
530     */
531    private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
532      return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
533        status -> function.test(status))).orElseGet(Collections::emptyList);
534    }
535
536    /**
537     * Perform a delete on a specified type.
538     * @param deletion a delete
539     * @param type possible values are 'files', 'subdirs', 'dirs'
540     * @return true if it deleted successfully, false otherwise
541     */
542    private boolean deleteAction(Action<Boolean> deletion, String type) {
543      boolean deleted;
544      try {
545        LOG.trace("Start deleting {} under {}", type, dir);
546        deleted = deletion.act();
547      } catch (PathIsNotEmptyDirectoryException exception) {
548        // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
549        // LocalFileSystem throws a bare IOException. So some test code will get the verbose
550        // message below.
551        LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
552            "exception details at TRACE.", dir);
553        LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
554        deleted = false;
555      } catch (IOException ioe) {
556        LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
557                  "happening, use following exception when asking on mailing list.",
558                  type, dir, ioe);
559        deleted = false;
560      }
561      LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
562      return deleted;
563    }
564
565    /**
566     * Get cleaner results of subdirs.
567     * @param tasks subdirs cleaner tasks
568     * @return true if all subdirs deleted successfully, false for patial/all failures
569     * @throws IOException something happen during computation
570     */
571    private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
572      boolean cleaned = true;
573      try {
574        for (CleanerTask task : tasks) {
575          cleaned &= task.get();
576        }
577      } catch (InterruptedException | ExecutionException e) {
578        throw new IOException(e);
579      }
580      return cleaned;
581    }
582  }
583}