1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import java.io.IOException;
21 import java.util.LinkedList;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.RemoteExceptionHandler;
31 import org.apache.hadoop.hbase.ScheduledChore;
32 import org.apache.hadoop.hbase.Stoppable;
33 import org.apache.hadoop.hbase.util.FSUtils;
34
35 import com.google.common.annotations.VisibleForTesting;
36 import com.google.common.collect.ImmutableSet;
37 import com.google.common.collect.Iterables;
38 import com.google.common.collect.Lists;
39
40
41
42
43
44 public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
45
46 private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
47
48 private final FileSystem fs;
49 private final Path oldFileDir;
50 private final Configuration conf;
51 protected List<T> cleanersChain;
52
53
54
55
56
57
58
59
60
61
62 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
63 FileSystem fs, Path oldFileDir, String confKey) {
64 super(name, s, sleepPeriod);
65 this.fs = fs;
66 this.oldFileDir = oldFileDir;
67 this.conf = conf;
68
69 initCleanerChain(confKey);
70 }
71
72
73
74
75
76
77
78 protected abstract boolean validate(Path file);
79
80
81
82
83
84 private void initCleanerChain(String confKey) {
85 this.cleanersChain = new LinkedList<T>();
86 String[] logCleaners = conf.getStrings(confKey);
87 if (logCleaners != null) {
88 for (String className : logCleaners) {
89 T logCleaner = newFileCleaner(className, conf);
90 if (logCleaner != null) {
91 LOG.debug("initialize cleaner=" + className);
92 this.cleanersChain.add(logCleaner);
93 }
94 }
95 }
96 }
97
98
99
100
101
102
103
104
105 private T newFileCleaner(String className, Configuration conf) {
106 try {
107 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
108 FileCleanerDelegate.class);
109 @SuppressWarnings("unchecked")
110 T cleaner = (T) c.newInstance();
111 cleaner.setConf(conf);
112 return cleaner;
113 } catch (Exception e) {
114 LOG.warn("Can NOT create CleanerDelegate: " + className, e);
115
116 return null;
117 }
118 }
119
120 @Override
121 protected void chore() {
122 try {
123 FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
124 checkAndDeleteEntries(files);
125 } catch (IOException e) {
126 e = RemoteExceptionHandler.checkIOException(e);
127 LOG.warn("Error while cleaning the logs", e);
128 }
129 }
130
131
132
133
134
135
136
137
138
139 private boolean checkAndDeleteEntries(FileStatus[] entries) {
140 if (entries == null) {
141 return true;
142 }
143 boolean allEntriesDeleted = true;
144 List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
145 for (FileStatus child : entries) {
146 Path path = child.getPath();
147 if (child.isDirectory()) {
148
149 if (!checkAndDeleteDirectory(path)) {
150 allEntriesDeleted = false;
151 }
152 } else {
153
154 files.add(child);
155 }
156 }
157 if (!checkAndDeleteFiles(files)) {
158 allEntriesDeleted = false;
159 }
160 return allEntriesDeleted;
161 }
162
163
164
165
166
167
168
169
170
171
172
173 @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
174 if (LOG.isTraceEnabled()) {
175 LOG.trace("Checking directory: " + dir);
176 }
177
178 try {
179 FileStatus[] children = FSUtils.listStatus(fs, dir);
180 boolean allChildrenDeleted = checkAndDeleteEntries(children);
181
182
183 if (!allChildrenDeleted) return false;
184 } catch (IOException e) {
185 e = RemoteExceptionHandler.checkIOException(e);
186 LOG.warn("Error while listing directory: " + dir, e);
187
188 return false;
189 }
190
191
192
193
194 try {
195 return fs.delete(dir, false);
196 } catch (IOException e) {
197 if (LOG.isTraceEnabled()) {
198 LOG.trace("Couldn't delete directory: " + dir, e);
199 }
200
201 return false;
202 }
203 }
204
205
206
207
208
209
210
211 private boolean checkAndDeleteFiles(List<FileStatus> files) {
212
213 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
214 List<FileStatus> invalidFiles = Lists.newArrayList();
215 for (FileStatus file : files) {
216 if (validate(file.getPath())) {
217 validFiles.add(file);
218 } else {
219 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
220 invalidFiles.add(file);
221 }
222 }
223
224 Iterable<FileStatus> deletableValidFiles = validFiles;
225
226 for (T cleaner : cleanersChain) {
227 if (cleaner.isStopped() || getStopper().isStopped()) {
228 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
229 + this.oldFileDir);
230 return false;
231 }
232
233 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
234
235
236 if (LOG.isTraceEnabled()) {
237 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
238 for (FileStatus file : deletableValidFiles) {
239 if (!filteredFileSet.contains(file)) {
240 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
241 }
242 }
243 }
244
245 deletableValidFiles = filteredFiles;
246 }
247
248 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
249 int deletedFileCount = 0;
250 for (FileStatus file : filesToDelete) {
251 Path filePath = file.getPath();
252 if (LOG.isDebugEnabled()) {
253 LOG.debug("Removing: " + filePath + " from archive");
254 }
255 try {
256 boolean success = this.fs.delete(filePath, false);
257 if (success) {
258 deletedFileCount++;
259 } else {
260 LOG.warn("Attempted to delete:" + filePath
261 + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
262 }
263 } catch (IOException e) {
264 e = RemoteExceptionHandler.checkIOException(e);
265 LOG.warn("Error while deleting: " + filePath, e);
266 }
267 }
268
269 return deletedFileCount == files.size();
270 }
271
272 @Override
273 public void cleanup() {
274 for (T lc : this.cleanersChain) {
275 try {
276 lc.stop("Exiting");
277 } catch (Throwable t) {
278 LOG.warn("Stopping", t);
279 }
280 }
281 }
282 }