View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.cleaner;
19  
20  import java.io.IOException;
21  import java.util.LinkedList;
22  import java.util.List;
23  import java.util.Map;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.RemoteExceptionHandler;
32  import org.apache.hadoop.hbase.ScheduledChore;
33  import org.apache.hadoop.hbase.Stoppable;
34  import org.apache.hadoop.hbase.util.FSUtils;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  import com.google.common.collect.ImmutableSet;
38  import com.google.common.collect.Iterables;
39  import com.google.common.collect.Lists;
40  
41  /**
42   * Abstract Cleaner that uses a chain of delegates to clean a directory of files
43   * @param <T> Cleaner delegate class that is dynamically loaded from configuration
44   */
45  public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
46  
47    private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
48  
49    private final FileSystem fs;
50    private final Path oldFileDir;
51    private final Configuration conf;
52    protected List<T> cleanersChain;
53    protected Map<String, Object> params;
54  
55    public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
56                        FileSystem fs, Path oldFileDir, String confKey) {
57      this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
58    }
59  
60    /**
61     * @param name name of the chore being run
62     * @param sleepPeriod the period of time to sleep between each run
63     * @param s the stopper
64     * @param conf configuration to use
65     * @param fs handle to the FS
66     * @param oldFileDir the path to the archived files
67     * @param confKey configuration key for the classes to instantiate
68     * @param params members could be used in cleaner
69     */
70    public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
71        FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
72      super(name, s, sleepPeriod);
73      this.fs = fs;
74      this.oldFileDir = oldFileDir;
75      this.conf = conf;
76      this.params = params;
77      initCleanerChain(confKey);
78    }
79  
80  
81    /**
82     * Validate the file to see if it even belongs in the directory. If it is valid, then the file
83     * will go through the cleaner delegates, but otherwise the file is just deleted.
84     * @param file full {@link Path} of the file to be checked
85     * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
86     */
87    protected abstract boolean validate(Path file);
88  
89    /**
90     * Instantiate and initialize all the file cleaners set in the configuration
91     * @param confKey key to get the file cleaner classes from the configuration
92     */
93    private void initCleanerChain(String confKey) {
94      this.cleanersChain = new LinkedList<T>();
95      String[] logCleaners = conf.getStrings(confKey);
96      if (logCleaners != null) {
97        for (String className : logCleaners) {
98          T logCleaner = newFileCleaner(className, conf);
99          if (logCleaner != null) {
100           LOG.debug("initialize cleaner=" + className);
101           this.cleanersChain.add(logCleaner);
102         }
103       }
104     }
105   }
106 
107   /**
108    * A utility method to create new instances of LogCleanerDelegate based on the class name of the
109    * LogCleanerDelegate.
110    * @param className fully qualified class name of the LogCleanerDelegate
111    * @param conf
112    * @return the new instance
113    */
114   private T newFileCleaner(String className, Configuration conf) {
115     try {
116       Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
117         FileCleanerDelegate.class);
118       @SuppressWarnings("unchecked")
119       T cleaner = (T) c.newInstance();
120       cleaner.setConf(conf);
121       cleaner.init(this.params);
122       return cleaner;
123     } catch (Exception e) {
124       LOG.warn("Can NOT create CleanerDelegate: " + className, e);
125       // skipping if can't instantiate
126       return null;
127     }
128   }
129 
130   @Override
131   protected void chore() {
132     try {
133       FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
134       checkAndDeleteEntries(files);
135     } catch (IOException e) {
136       e = RemoteExceptionHandler.checkIOException(e);
137       LOG.warn("Error while cleaning the logs", e);
138     }
139   }
140 
141   /**
142    * Loop over the given directory entries, and check whether they can be deleted.
143    * If an entry is itself a directory it will be recursively checked and deleted itself iff
144    * all subentries are deleted (and no new subentries are added in the mean time)
145    *
146    * @param entries directory entries to check
147    * @return true if all entries were successfully deleted
148    */
149   private boolean checkAndDeleteEntries(FileStatus[] entries) {
150     if (entries == null) {
151       return true;
152     }
153     boolean allEntriesDeleted = true;
154     List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
155     for (FileStatus child : entries) {
156       Path path = child.getPath();
157       if (child.isDirectory()) {
158         // for each subdirectory delete it and all entries if possible
159         if (!checkAndDeleteDirectory(path)) {
160           allEntriesDeleted = false;
161         }
162       } else {
163         // collect all files to attempt to delete in one batch
164         files.add(child);
165       }
166     }
167     if (!checkAndDeleteFiles(files)) {
168       allEntriesDeleted = false;
169     }
170     return allEntriesDeleted;
171   }
172   
173   /**
174    * Attempt to delete a directory and all files under that directory. Each child file is passed
175    * through the delegates to see if it can be deleted. If the directory has no children when the
176    * cleaners have finished it is deleted.
177    * <p>
178    * If new children files are added between checks of the directory, the directory will <b>not</b>
179    * be deleted.
180    * @param dir directory to check
181    * @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
182    */
183   @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
184     if (LOG.isTraceEnabled()) {
185       LOG.trace("Checking directory: " + dir);
186     }
187 
188     try {
189       FileStatus[] children = FSUtils.listStatus(fs, dir);
190       boolean allChildrenDeleted = checkAndDeleteEntries(children);
191   
192       // if the directory still has children, we can't delete it, so we are done
193       if (!allChildrenDeleted) return false;
194     } catch (IOException e) {
195       e = RemoteExceptionHandler.checkIOException(e);
196       LOG.warn("Error while listing directory: " + dir, e);
197       // couldn't list directory, so don't try to delete, and don't return success
198       return false;
199     }
200 
201     // otherwise, all the children (that we know about) have been deleted, so we should try to
202     // delete this directory. However, don't do so recursively so we don't delete files that have
203     // been added since we last checked.
204     try {
205       return fs.delete(dir, false);
206     } catch (IOException e) {
207       if (LOG.isTraceEnabled()) {
208         LOG.trace("Couldn't delete directory: " + dir, e);
209       }
210       // couldn't delete w/o exception, so we can't return success.
211       return false;
212     }
213   }
214 
215   /**
216    * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
217    * necessary.
218    * @param files List of FileStatus for the files to check (and possibly delete)
219    * @return true iff successfully deleted all files
220    */
221   private boolean checkAndDeleteFiles(List<FileStatus> files) {
222     // first check to see if the path is valid
223     List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
224     List<FileStatus> invalidFiles = Lists.newArrayList();
225     for (FileStatus file : files) {
226       if (validate(file.getPath())) {
227         validFiles.add(file);
228       } else {
229         LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
230         invalidFiles.add(file);
231       }
232     }
233 
234     Iterable<FileStatus> deletableValidFiles = validFiles;
235     // check each of the cleaners for the valid files
236     for (T cleaner : cleanersChain) {
237       if (cleaner.isStopped() || getStopper().isStopped()) {
238         LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
239             + this.oldFileDir);
240         return false;
241       }
242 
243       Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
244       
245       // trace which cleaner is holding on to each file
246       if (LOG.isTraceEnabled()) {
247         ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
248         for (FileStatus file : deletableValidFiles) {
249           if (!filteredFileSet.contains(file)) {
250             LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
251           }
252         }
253       }
254       
255       deletableValidFiles = filteredFiles;
256     }
257     
258     Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
259     int deletedFileCount = 0;
260     for (FileStatus file : filesToDelete) {
261       Path filePath = file.getPath();
262       if (LOG.isDebugEnabled()) {
263         LOG.debug("Removing: " + filePath + " from archive");
264       }
265       try {
266         boolean success = this.fs.delete(filePath, false);
267         if (success) {
268           deletedFileCount++;
269         } else {
270           LOG.warn("Attempted to delete:" + filePath
271               + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
272         }
273       } catch (IOException e) {
274         e = RemoteExceptionHandler.checkIOException(e);
275         LOG.warn("Error while deleting: " + filePath, e);
276       }
277     }
278 
279     return deletedFileCount == files.size();
280   }
281 
282   @Override
283   public void cleanup() {
284     for (T lc : this.cleanersChain) {
285       try {
286         lc.stop("Exiting");
287       } catch (Throwable t) {
288         LOG.warn("Stopping", t);
289       }
290     }
291   }
292 }