View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.cleaner;
19  
20  import java.io.IOException;
21  import java.util.LinkedList;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.RemoteExceptionHandler;
31  import org.apache.hadoop.hbase.ScheduledChore;
32  import org.apache.hadoop.hbase.Stoppable;
33  import org.apache.hadoop.hbase.util.FSUtils;
34  
35  import com.google.common.annotations.VisibleForTesting;
36  import com.google.common.collect.ImmutableSet;
37  import com.google.common.collect.Iterables;
38  import com.google.common.collect.Lists;
39  
40  /**
41   * Abstract Cleaner that uses a chain of delegates to clean a directory of files
42   * @param <T> Cleaner delegate class that is dynamically loaded from configuration
43   */
44  public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
45  
46    private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
47  
48    private final FileSystem fs;
49    private final Path oldFileDir;
50    private final Configuration conf;
51    protected List<T> cleanersChain;
52  
53    /**
54     * @param name name of the chore being run
55     * @param sleepPeriod the period of time to sleep between each run
56     * @param s the stopper
57     * @param conf configuration to use
58     * @param fs handle to the FS
59     * @param oldFileDir the path to the archived files
60     * @param confKey configuration key for the classes to instantiate
61     */
62    public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
63        FileSystem fs, Path oldFileDir, String confKey) {
64      super(name, s, sleepPeriod);
65      this.fs = fs;
66      this.oldFileDir = oldFileDir;
67      this.conf = conf;
68  
69      initCleanerChain(confKey);
70    }
71  
72    /**
73     * Validate the file to see if it even belongs in the directory. If it is valid, then the file
74     * will go through the cleaner delegates, but otherwise the file is just deleted.
75     * @param file full {@link Path} of the file to be checked
76     * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
77     */
78    protected abstract boolean validate(Path file);
79  
80    /**
81     * Instantiate and initialize all the file cleaners set in the configuration
82     * @param confKey key to get the file cleaner classes from the configuration
83     */
84    private void initCleanerChain(String confKey) {
85      this.cleanersChain = new LinkedList<T>();
86      String[] logCleaners = conf.getStrings(confKey);
87      if (logCleaners != null) {
88        for (String className : logCleaners) {
89          T logCleaner = newFileCleaner(className, conf);
90          if (logCleaner != null) {
91            LOG.debug("initialize cleaner=" + className);
92            this.cleanersChain.add(logCleaner);
93          }
94        }
95      }
96    }
97  
98    /**
99     * A utility method to create new instances of LogCleanerDelegate based on the class name of the
100    * LogCleanerDelegate.
101    * @param className fully qualified class name of the LogCleanerDelegate
102    * @param conf
103    * @return the new instance
104    */
105   private T newFileCleaner(String className, Configuration conf) {
106     try {
107       Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
108         FileCleanerDelegate.class);
109       @SuppressWarnings("unchecked")
110       T cleaner = (T) c.newInstance();
111       cleaner.setConf(conf);
112       return cleaner;
113     } catch (Exception e) {
114       LOG.warn("Can NOT create CleanerDelegate: " + className, e);
115       // skipping if can't instantiate
116       return null;
117     }
118   }
119 
120   @Override
121   protected void chore() {
122     try {
123       FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
124       checkAndDeleteEntries(files);
125     } catch (IOException e) {
126       e = RemoteExceptionHandler.checkIOException(e);
127       LOG.warn("Error while cleaning the logs", e);
128     }
129   }
130 
131   /**
132    * Loop over the given directory entries, and check whether they can be deleted.
133    * If an entry is itself a directory it will be recursively checked and deleted itself iff
134    * all subentries are deleted (and no new subentries are added in the mean time)
135    *
136    * @param entries directory entries to check
137    * @return true if all entries were successfully deleted
138    */
139   private boolean checkAndDeleteEntries(FileStatus[] entries) {
140     if (entries == null) {
141       return true;
142     }
143     boolean allEntriesDeleted = true;
144     List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
145     for (FileStatus child : entries) {
146       Path path = child.getPath();
147       if (child.isDirectory()) {
148         // for each subdirectory delete it and all entries if possible
149         if (!checkAndDeleteDirectory(path)) {
150           allEntriesDeleted = false;
151         }
152       } else {
153         // collect all files to attempt to delete in one batch
154         files.add(child);
155       }
156     }
157     if (!checkAndDeleteFiles(files)) {
158       allEntriesDeleted = false;
159     }
160     return allEntriesDeleted;
161   }
162   
163   /**
164    * Attempt to delete a directory and all files under that directory. Each child file is passed
165    * through the delegates to see if it can be deleted. If the directory has no children when the
166    * cleaners have finished it is deleted.
167    * <p>
168    * If new children files are added between checks of the directory, the directory will <b>not</b>
169    * be deleted.
170    * @param dir directory to check
171    * @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
172    */
173   @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
174     if (LOG.isTraceEnabled()) {
175       LOG.trace("Checking directory: " + dir);
176     }
177 
178     try {
179       FileStatus[] children = FSUtils.listStatus(fs, dir);
180       boolean allChildrenDeleted = checkAndDeleteEntries(children);
181   
182       // if the directory still has children, we can't delete it, so we are done
183       if (!allChildrenDeleted) return false;
184     } catch (IOException e) {
185       e = RemoteExceptionHandler.checkIOException(e);
186       LOG.warn("Error while listing directory: " + dir, e);
187       // couldn't list directory, so don't try to delete, and don't return success
188       return false;
189     }
190 
191     // otherwise, all the children (that we know about) have been deleted, so we should try to
192     // delete this directory. However, don't do so recursively so we don't delete files that have
193     // been added since we last checked.
194     try {
195       return fs.delete(dir, false);
196     } catch (IOException e) {
197       if (LOG.isTraceEnabled()) {
198         LOG.trace("Couldn't delete directory: " + dir, e);
199       }
200       // couldn't delete w/o exception, so we can't return success.
201       return false;
202     }
203   }
204 
205   /**
206    * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
207    * necessary.
208    * @param files List of FileStatus for the files to check (and possibly delete)
209    * @return true iff successfully deleted all files
210    */
211   private boolean checkAndDeleteFiles(List<FileStatus> files) {
212     // first check to see if the path is valid
213     List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
214     List<FileStatus> invalidFiles = Lists.newArrayList();
215     for (FileStatus file : files) {
216       if (validate(file.getPath())) {
217         validFiles.add(file);
218       } else {
219         LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
220         invalidFiles.add(file);
221       }
222     }
223 
224     Iterable<FileStatus> deletableValidFiles = validFiles;
225     // check each of the cleaners for the valid files
226     for (T cleaner : cleanersChain) {
227       if (cleaner.isStopped() || getStopper().isStopped()) {
228         LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
229             + this.oldFileDir);
230         return false;
231       }
232 
233       Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
234       
235       // trace which cleaner is holding on to each file
236       if (LOG.isTraceEnabled()) {
237         ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
238         for (FileStatus file : deletableValidFiles) {
239           if (!filteredFileSet.contains(file)) {
240             LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
241           }
242         }
243       }
244       
245       deletableValidFiles = filteredFiles;
246     }
247     
248     Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
249     int deletedFileCount = 0;
250     for (FileStatus file : filesToDelete) {
251       Path filePath = file.getPath();
252       if (LOG.isDebugEnabled()) {
253         LOG.debug("Removing: " + filePath + " from archive");
254       }
255       try {
256         boolean success = this.fs.delete(filePath, false);
257         if (success) {
258           deletedFileCount++;
259         } else {
260           LOG.warn("Attempted to delete:" + filePath
261               + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
262         }
263       } catch (IOException e) {
264         e = RemoteExceptionHandler.checkIOException(e);
265         LOG.warn("Error while deleting: " + filePath, e);
266       }
267     }
268 
269     return deletedFileCount == files.size();
270   }
271 
272   @Override
273   public void cleanup() {
274     for (T lc : this.cleanersChain) {
275       try {
276         lc.stop("Exiting");
277       } catch (Throwable t) {
278         LOG.warn("Stopping", t);
279       }
280     }
281   }
282 }