1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import java.io.IOException;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.RemoteExceptionHandler;
32 import org.apache.hadoop.hbase.ScheduledChore;
33 import org.apache.hadoop.hbase.Stoppable;
34 import org.apache.hadoop.hbase.util.FSUtils;
35
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.collect.ImmutableSet;
38 import com.google.common.collect.Iterables;
39 import com.google.common.collect.Lists;
40
41
42
43
44
45 public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
46
47 private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
48
49 private final FileSystem fs;
50 private final Path oldFileDir;
51 private final Configuration conf;
52 protected List<T> cleanersChain;
53 protected Map<String, Object> params;
54
55 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
56 FileSystem fs, Path oldFileDir, String confKey) {
57 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
58 }
59
60
61
62
63
64
65
66
67
68
69
70 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
71 FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
72 super(name, s, sleepPeriod);
73 this.fs = fs;
74 this.oldFileDir = oldFileDir;
75 this.conf = conf;
76 this.params = params;
77 initCleanerChain(confKey);
78 }
79
80
81
82
83
84
85
86
87 protected abstract boolean validate(Path file);
88
89
90
91
92
93 private void initCleanerChain(String confKey) {
94 this.cleanersChain = new LinkedList<T>();
95 String[] logCleaners = conf.getStrings(confKey);
96 if (logCleaners != null) {
97 for (String className : logCleaners) {
98 T logCleaner = newFileCleaner(className, conf);
99 if (logCleaner != null) {
100 LOG.debug("initialize cleaner=" + className);
101 this.cleanersChain.add(logCleaner);
102 }
103 }
104 }
105 }
106
107
108
109
110
111
112
113
114 private T newFileCleaner(String className, Configuration conf) {
115 try {
116 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
117 FileCleanerDelegate.class);
118 @SuppressWarnings("unchecked")
119 T cleaner = (T) c.newInstance();
120 cleaner.setConf(conf);
121 cleaner.init(this.params);
122 return cleaner;
123 } catch (Exception e) {
124 LOG.warn("Can NOT create CleanerDelegate: " + className, e);
125
126 return null;
127 }
128 }
129
130 @Override
131 protected void chore() {
132 try {
133 FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
134 checkAndDeleteEntries(files);
135 } catch (IOException e) {
136 e = RemoteExceptionHandler.checkIOException(e);
137 LOG.warn("Error while cleaning the logs", e);
138 }
139 }
140
141
142
143
144
145
146
147
148
149 private boolean checkAndDeleteEntries(FileStatus[] entries) {
150 if (entries == null) {
151 return true;
152 }
153 boolean allEntriesDeleted = true;
154 List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
155 for (FileStatus child : entries) {
156 Path path = child.getPath();
157 if (child.isDirectory()) {
158
159 if (!checkAndDeleteDirectory(path)) {
160 allEntriesDeleted = false;
161 }
162 } else {
163
164 files.add(child);
165 }
166 }
167 if (!checkAndDeleteFiles(files)) {
168 allEntriesDeleted = false;
169 }
170 return allEntriesDeleted;
171 }
172
173
174
175
176
177
178
179
180
181
182
183 @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
184 if (LOG.isTraceEnabled()) {
185 LOG.trace("Checking directory: " + dir);
186 }
187
188 try {
189 FileStatus[] children = FSUtils.listStatus(fs, dir);
190 boolean allChildrenDeleted = checkAndDeleteEntries(children);
191
192
193 if (!allChildrenDeleted) return false;
194 } catch (IOException e) {
195 e = RemoteExceptionHandler.checkIOException(e);
196 LOG.warn("Error while listing directory: " + dir, e);
197
198 return false;
199 }
200
201
202
203
204 try {
205 return fs.delete(dir, false);
206 } catch (IOException e) {
207 if (LOG.isTraceEnabled()) {
208 LOG.trace("Couldn't delete directory: " + dir, e);
209 }
210
211 return false;
212 }
213 }
214
215
216
217
218
219
220
221 private boolean checkAndDeleteFiles(List<FileStatus> files) {
222
223 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
224 List<FileStatus> invalidFiles = Lists.newArrayList();
225 for (FileStatus file : files) {
226 if (validate(file.getPath())) {
227 validFiles.add(file);
228 } else {
229 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
230 invalidFiles.add(file);
231 }
232 }
233
234 Iterable<FileStatus> deletableValidFiles = validFiles;
235
236 for (T cleaner : cleanersChain) {
237 if (cleaner.isStopped() || getStopper().isStopped()) {
238 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
239 + this.oldFileDir);
240 return false;
241 }
242
243 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
244
245
246 if (LOG.isTraceEnabled()) {
247 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
248 for (FileStatus file : deletableValidFiles) {
249 if (!filteredFileSet.contains(file)) {
250 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
251 }
252 }
253 }
254
255 deletableValidFiles = filteredFiles;
256 }
257
258 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
259 int deletedFileCount = 0;
260 for (FileStatus file : filesToDelete) {
261 Path filePath = file.getPath();
262 if (LOG.isDebugEnabled()) {
263 LOG.debug("Removing: " + filePath + " from archive");
264 }
265 try {
266 boolean success = this.fs.delete(filePath, false);
267 if (success) {
268 deletedFileCount++;
269 } else {
270 LOG.warn("Attempted to delete:" + filePath
271 + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
272 }
273 } catch (IOException e) {
274 e = RemoteExceptionHandler.checkIOException(e);
275 LOG.warn("Error while deleting: " + filePath, e);
276 }
277 }
278
279 return deletedFileCount == files.size();
280 }
281
282 @Override
283 public void cleanup() {
284 for (T lc : this.cleanersChain) {
285 try {
286 lc.stop("Exiting");
287 } catch (Throwable t) {
288 LOG.warn("Stopping", t);
289 }
290 }
291 }
292 }