1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util.hbck;
19
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ConcurrentSkipListSet;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
43 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
44 import org.apache.hadoop.hbase.io.hfile.HFile;
45 import org.apache.hadoop.hbase.util.FSUtils;
46 import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
47 import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
48 import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
49 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.Private
60 public class HFileCorruptionChecker {
61 private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
62
63 final Configuration conf;
64 final FileSystem fs;
65 final CacheConfig cacheConf;
66 final ExecutorService executor;
67 final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
68 final Set<Path> failures = new ConcurrentSkipListSet<Path>();
69 final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
70 final Set<Path> missing = new ConcurrentSkipListSet<Path>();
71 final boolean inQuarantineMode;
72 final AtomicInteger hfilesChecked = new AtomicInteger();
73
74 public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
75 boolean quarantine) throws IOException {
76 this.conf = conf;
77 this.fs = FileSystem.get(conf);
78 this.cacheConf = new CacheConfig(conf);
79 this.executor = executor;
80 this.inQuarantineMode = quarantine;
81 }
82
83
84
85
86
87
88
89
90
91 protected void checkHFile(Path p) throws IOException {
92 HFile.Reader r = null;
93 try {
94 r = HFile.createReader(fs, p, cacheConf, conf);
95 } catch (CorruptHFileException che) {
96 LOG.warn("Found corrupt HFile " + p, che);
97 corrupted.add(p);
98 if (inQuarantineMode) {
99 Path dest = createQuarantinePath(p);
100 LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
101 boolean success = fs.mkdirs(dest.getParent());
102 success = success ? fs.rename(p, dest): false;
103 if (!success) {
104 failures.add(p);
105 } else {
106 quarantined.add(dest);
107 }
108 }
109 return;
110 } catch (FileNotFoundException fnfe) {
111 LOG.warn("HFile " + p + " was missing. Likely removed due to compaction/split?");
112 missing.add(p);
113 } finally {
114 hfilesChecked.addAndGet(1);
115 if (r != null) {
116 r.close(true);
117 }
118 }
119 }
120
121
122
123
124
125
126
127
128
129
130
131 Path createQuarantinePath(Path hFile) throws IOException {
132
133 Path cfDir = hFile.getParent();
134 Path regionDir = cfDir.getParent();
135 Path tableDir = regionDir.getParent();
136
137
138 Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), conf.get(
139 "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
140 Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
141 Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
142 Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
143 Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
144 return corruptHfile;
145 }
146
147
148
149
150
151
152
153
154 protected void checkColFamDir(Path cfDir) throws IOException {
155 FileStatus[] hfs = null;
156 try {
157 hfs = fs.listStatus(cfDir, new HFileFilter(fs));
158 } catch (FileNotFoundException fnfe) {
159
160 LOG.warn("Colfam Directory " + cfDir +
161 " does not exist. Likely due to concurrent split/compaction. Skipping.");
162 missing.add(cfDir);
163 return;
164 }
165
166
167 if (hfs.length == 0 && !fs.exists(cfDir)) {
168 LOG.warn("Colfam Directory " + cfDir +
169 " does not exist. Likely due to concurrent split/compaction. Skipping.");
170 missing.add(cfDir);
171 return;
172 }
173 for (FileStatus hfFs : hfs) {
174 Path hf = hfFs.getPath();
175 checkHFile(hf);
176 }
177 }
178
179
180
181
182
183
184
185
186 protected void checkRegionDir(Path regionDir) throws IOException {
187 FileStatus[] cfs = null;
188 try {
189 cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
190 } catch (FileNotFoundException fnfe) {
191
192 LOG.warn("Region Directory " + regionDir +
193 " does not exist. Likely due to concurrent split/compaction. Skipping.");
194 missing.add(regionDir);
195 return;
196 }
197
198
199 if (cfs.length == 0 && !fs.exists(regionDir)) {
200 LOG.warn("Region Directory " + regionDir +
201 " does not exist. Likely due to concurrent split/compaction. Skipping.");
202 missing.add(regionDir);
203 return;
204 }
205
206 for (FileStatus cfFs : cfs) {
207 Path cfDir = cfFs.getPath();
208 checkColFamDir(cfDir);
209 }
210 }
211
212
213
214
215
216
217
218
219 void checkTableDir(Path tableDir) throws IOException {
220 FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
221 if (rds.length == 0 && !fs.exists(tableDir)) {
222
223 LOG.warn("Table Directory " + tableDir +
224 " does not exist. Likely due to concurrent delete. Skipping.");
225 missing.add(tableDir);
226 return;
227 }
228
229
230 List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
231 List<Future<Void>> rdFutures;
232
233 for (FileStatus rdFs : rds) {
234 Path rdDir = rdFs.getPath();
235 RegionDirChecker work = new RegionDirChecker(rdDir);
236 rdcs.add(work);
237 }
238
239
240 try {
241 rdFutures = executor.invokeAll(rdcs);
242 } catch (InterruptedException ie) {
243 Thread.currentThread().interrupt();
244 LOG.warn("Region dirs checking interrupted!", ie);
245 return;
246 }
247
248 for (int i = 0; i < rdFutures.size(); i++) {
249 Future<Void> f = rdFutures.get(i);
250 try {
251 f.get();
252 } catch (ExecutionException e) {
253 LOG.warn("Failed to quaratine an HFile in regiondir "
254 + rdcs.get(i).regionDir, e.getCause());
255
256 if (e.getCause() instanceof IOException) {
257 throw (IOException) e.getCause();
258 }
259
260
261 if (e.getCause() instanceof RuntimeException) {
262 throw (RuntimeException) e.getCause();
263 }
264
265
266 LOG.error("Unexpected exception encountered", e);
267 return;
268 } catch (InterruptedException ie) {
269 Thread.currentThread().interrupt();
270 LOG.warn("Region dirs check interrupted!", ie);
271
272 return;
273 }
274 }
275 }
276
277
278
279
280
281 private class RegionDirChecker implements Callable<Void> {
282 final Path regionDir;
283
284 RegionDirChecker(Path regionDir) {
285 this.regionDir = regionDir;
286 }
287
288 @Override
289 public Void call() throws IOException {
290 checkRegionDir(regionDir);
291 return null;
292 }
293 }
294
295
296
297
298 public void checkTables(Collection<Path> tables) throws IOException {
299 for (Path t : tables) {
300 checkTableDir(t);
301 }
302 }
303
304
305
306
307 public Collection<Path> getFailures() {
308 return new HashSet<Path>(failures);
309 }
310
311
312
313
314 public Collection<Path> getCorrupted() {
315 return new HashSet<Path>(corrupted);
316 }
317
318
319
320
321 public int getHFilesChecked() {
322 return hfilesChecked.get();
323 }
324
325
326
327
328 public Collection<Path> getQuarantined() {
329 return new HashSet<Path>(quarantined);
330 }
331
332
333
334
335
336 public Collection<Path> getMissing() {
337 return new HashSet<Path>(missing);
338 }
339
340
341
342
343
344 public void report(ErrorReporter out) {
345 out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
346 out.print(" HFiles corrupted: " + corrupted.size());
347 if (inQuarantineMode) {
348 out.print(" HFiles successfully quarantined: " + quarantined.size());
349 for (Path sq : quarantined) {
350 out.print(" " + sq);
351 }
352 out.print(" HFiles failed quarantine: " + failures.size());
353 for (Path fq : failures) {
354 out.print(" " + fq);
355 }
356 }
357 out.print(" HFiles moved while checking: " + missing.size());
358 for (Path mq : missing) {
359 out.print(" " + mq);
360 }
361
362 String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
363 String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
364 : "CORRUPTED";
365
366 if (inQuarantineMode) {
367 out.print("Summary: " + initialState + " => " + fixedState);
368 } else {
369 out.print("Summary: " + initialState);
370 }
371 }
372 }