View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util.hbck;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentSkipListSet;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
44  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
45  import org.apache.hadoop.hbase.io.hfile.HFile;
46  import org.apache.hadoop.hbase.mob.MobUtils;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
49  import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
50  import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
51  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
52  
53  /**
54   * This class marches through all of the region's hfiles and verifies that
55   * they are all valid files. One just needs to instantiate the class, use
56   * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
57   * quarantined files if in quarantining mode)
58   *
59   * The implementation currently parallelizes at the regionDir level.
60   */
61  @InterfaceAudience.Private
62  public class HFileCorruptionChecker {
63    private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
64  
65    final Configuration conf;
66    final FileSystem fs;
67    final CacheConfig cacheConf;
68    final ExecutorService executor;
69    final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
70    final Set<Path> failures = new ConcurrentSkipListSet<Path>();
71    final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
72    final Set<Path> missing = new ConcurrentSkipListSet<Path>();
73    final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<Path>();
74    final Set<Path> failureMobFiles = new ConcurrentSkipListSet<Path>();
75    final Set<Path> missedMobFiles = new ConcurrentSkipListSet<Path>();
76    final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<Path>();
77    final boolean inQuarantineMode;
78    final AtomicInteger hfilesChecked = new AtomicInteger();
79    final AtomicInteger mobFilesChecked = new AtomicInteger();
80  
81    public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
82        boolean quarantine) throws IOException {
83      this.conf = conf;
84      this.fs = FileSystem.get(conf);
85      this.cacheConf = new CacheConfig(conf);
86      this.executor = executor;
87      this.inQuarantineMode = quarantine;
88    }
89  
90    /**
91     * Checks a path to see if it is a valid hfile.
92     *
93     * @param p
94     *          full Path to an HFile
95     * @throws IOException
96     *           This is a connectivity related exception
97     */
98    protected void checkHFile(Path p) throws IOException {
99      HFile.Reader r = null;
100     try {
101       r = HFile.createReader(fs, p, cacheConf, conf);
102     } catch (CorruptHFileException che) {
103       LOG.warn("Found corrupt HFile " + p, che);
104       corrupted.add(p);
105       if (inQuarantineMode) {
106         Path dest = createQuarantinePath(p);
107         LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
108         boolean success = fs.mkdirs(dest.getParent());
109         success = success ? fs.rename(p, dest): false;
110         if (!success) {
111           failures.add(p);
112         } else {
113           quarantined.add(dest);
114         }
115       }
116       return;
117     } catch (FileNotFoundException fnfe) {
118       LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
119       missing.add(p);
120     } finally {
121       hfilesChecked.addAndGet(1);
122       if (r != null) {
123         r.close(true);
124       }
125     }
126   }
127 
128   /**
129    * Given a path, generates a new path to where we move a corrupted hfile (bad
130    * trailer, no trailer).
131    *
132    * @param hFile
133    *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
134    *          /region/cf/file)
135    * @return path to where corrupted files are stored. This should be
136    *         HBASE_DIR/.corrupt/table/region/cf/file.
137    */
138   Path createQuarantinePath(Path hFile) throws IOException {
139     // extract the normal dirs structure
140     Path cfDir = hFile.getParent();
141     Path regionDir = cfDir.getParent();
142     Path tableDir = regionDir.getParent();
143 
144     // build up the corrupted dirs strcture
145     Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), conf.get(
146         "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
147     Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
148     Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
149     Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
150     Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
151     return corruptHfile;
152   }
153 
154   /**
155    * Check all files in a column family dir.
156    *
157    * @param cfDir
158    *          column family directory
159    * @throws IOException
160    */
161   protected void checkColFamDir(Path cfDir) throws IOException {
162     FileStatus[] statuses = null;
163     try {
164       statuses = fs.listStatus(cfDir); // use same filter as scanner.
165     } catch (FileNotFoundException fnfe) {
166       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
167       LOG.warn("Colfam Directory " + cfDir +
168           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
169       missing.add(cfDir);
170       return;
171     }
172 
173     List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
174     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
175     if (hfs.size() == 0 && !fs.exists(cfDir)) {
176       LOG.warn("Colfam Directory " + cfDir +
177           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
178       missing.add(cfDir);
179       return;
180     }
181     for (FileStatus hfFs : hfs) {
182       Path hf = hfFs.getPath();
183       checkHFile(hf);
184     }
185   }
186
187   /**
188    * Check all files in a mob column family dir.
189    *
190    * @param cfDir
191    *          mob column family directory
192    * @throws IOException
193    */
194   protected void checkMobColFamDir(Path cfDir) throws IOException {
195     FileStatus[] statuses = null;
196     try {
197       statuses = fs.listStatus(cfDir); // use same filter as scanner.
198     } catch (FileNotFoundException fnfe) {
199       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
200       LOG.warn("Mob colfam Directory " + cfDir +
201           " does not exist.  Likely the table is deleted. Skipping.");
202       missedMobFiles.add(cfDir);
203       return;
204     }
205
206     List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
207     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
208     if (hfs.size() == 0 && !fs.exists(cfDir)) {
209       LOG.warn("Mob colfam Directory " + cfDir +
210           " does not exist.  Likely the table is deleted. Skipping.");
211       missedMobFiles.add(cfDir);
212       return;
213     }
214     for (FileStatus hfFs : hfs) {
215       Path hf = hfFs.getPath();
216       checkMobFile(hf);
217     }
218   }
219
220   /**
221    * Checks a path to see if it is a valid mob file.
222    *
223    * @param p
224    *          full Path to a mob file.
225    * @throws IOException
226    *           This is a connectivity related exception
227    */
228   protected void checkMobFile(Path p) throws IOException {
229     HFile.Reader r = null;
230     try {
231       r = HFile.createReader(fs, p, cacheConf, conf);
232     } catch (CorruptHFileException che) {
233       LOG.warn("Found corrupt mob file " + p, che);
234       corruptedMobFiles.add(p);
235       if (inQuarantineMode) {
236         Path dest = createQuarantinePath(p);
237         LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
238         boolean success = fs.mkdirs(dest.getParent());
239         success = success ? fs.rename(p, dest): false;
240         if (!success) {
241           failureMobFiles.add(p);
242         } else {
243           quarantinedMobFiles.add(dest);
244         }
245       }
246       return;
247     } catch (FileNotFoundException fnfe) {
248       LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
249       missedMobFiles.add(p);
250     } finally {
251       mobFilesChecked.addAndGet(1);
252       if (r != null) {
253         r.close(true);
254       }
255     }
256   }
257
258   /**
259    * Checks all the mob files of a table.
260    * @param regionDir The mob region directory
261    * @throws IOException
262    */
263   private void checkMobRegionDir(Path regionDir) throws IOException {
264     if (!fs.exists(regionDir)) {
265       return;
266     }
267     FileStatus[] hfs = null;
268     try {
269       hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
270     } catch (FileNotFoundException fnfe) {
271       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
272       LOG.warn("Mob directory " + regionDir
273         + " does not exist.  Likely the table is deleted. Skipping.");
274       missedMobFiles.add(regionDir);
275       return;
276     }
277
278     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
279     if (hfs.length == 0 && !fs.exists(regionDir)) {
280       LOG.warn("Mob directory " + regionDir
281         + " does not exist.  Likely the table is deleted. Skipping.");
282       missedMobFiles.add(regionDir);
283       return;
284     }
285     for (FileStatus hfFs : hfs) {
286       Path hf = hfFs.getPath();
287       checkMobColFamDir(hf);
288     }
289   }
290
291   /**
292    * Check all column families in a region dir.
293    *
294    * @param regionDir
295    *          region directory
296    * @throws IOException
297    */
298   protected void checkRegionDir(Path regionDir) throws IOException {
299     FileStatus[] statuses = null;
300     try {
301       statuses = fs.listStatus(regionDir);
302     } catch (FileNotFoundException fnfe) {
303       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
304       LOG.warn("Region Directory " + regionDir +
305           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
306       missing.add(regionDir);
307       return;
308     }
309
310     List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
311     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
312     if (cfs.size() == 0 && !fs.exists(regionDir)) {
313       LOG.warn("Region Directory " + regionDir +
314           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
315       missing.add(regionDir);
316       return;
317     }
318
319     for (FileStatus cfFs : cfs) {
320       Path cfDir = cfFs.getPath();
321       checkColFamDir(cfDir);
322     }
323   }
324
325   /**
326    * Check all the regiondirs in the specified tableDir
327    *
328    * @param tableDir
329    *          path to a table
330    * @throws IOException
331    */
332   void checkTableDir(Path tableDir) throws IOException {
333     List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
334     if (rds == null) {
335       if (!fs.exists(tableDir)) {
336         LOG.warn("Table Directory " + tableDir +
337             " does not exist.  Likely due to concurrent delete. Skipping.");
338         missing.add(tableDir);
339       }
340       return;
341     }
342 
343     // Parallelize check at the region dir level
344     List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
345     List<Future<Void>> rdFutures;
346
347     for (FileStatus rdFs : rds) {
348       Path rdDir = rdFs.getPath();
349       RegionDirChecker work = new RegionDirChecker(rdDir);
350       rdcs.add(work);
351     }
352
353     // add mob region
354     rdcs.add(createMobRegionDirChecker(tableDir));
355     // Submit and wait for completion
356     try {
357       rdFutures = executor.invokeAll(rdcs);
358     } catch (InterruptedException ie) {
359       Thread.currentThread().interrupt();
360       LOG.warn("Region dirs checking interrupted!", ie);
361       return;
362     }
363
364     for (int i = 0; i < rdFutures.size(); i++) {
365       Future<Void> f = rdFutures.get(i);
366       try {
367         f.get();
368       } catch (ExecutionException e) {
369         LOG.warn("Failed to quaratine an HFile in regiondir "
370             + rdcs.get(i).regionDir, e.getCause());
371         // rethrow IOExceptions
372         if (e.getCause() instanceof IOException) {
373           throw (IOException) e.getCause();
374         }
375
376         // rethrow RuntimeExceptions
377         if (e.getCause() instanceof RuntimeException) {
378           throw (RuntimeException) e.getCause();
379         }
380
381         // this should never happen
382         LOG.error("Unexpected exception encountered", e);
383         return; // bailing out.
384       } catch (InterruptedException ie) {
385         Thread.currentThread().interrupt();
386         LOG.warn("Region dirs check interrupted!", ie);
387         // bailing out
388         return;
389       }
390     }
391   }
392
393   /**
394    * An individual work item for parallelized regiondir processing. This is
395    * intentionally an inner class so it can use the shared error sets and fs.
396    */
397   private class RegionDirChecker implements Callable<Void> {
398     final Path regionDir;
399 
400     RegionDirChecker(Path regionDir) {
401       this.regionDir = regionDir;
402     }
403
404     @Override
405     public Void call() throws IOException {
406       checkRegionDir(regionDir);
407       return null;
408     }
409   }
410
411   /**
412    * An individual work item for parallelized mob dir processing. This is
413    * intentionally an inner class so it can use the shared error sets and fs.
414    */
415   private class MobRegionDirChecker extends RegionDirChecker {
416 
417     MobRegionDirChecker(Path regionDir) {
418       super(regionDir);
419     }
420
421     @Override
422     public Void call() throws IOException {
423       checkMobRegionDir(regionDir);
424       return null;
425     }
426   }
427
428   /**
429    * Creates an instance of MobRegionDirChecker.
430    * @param tableDir The current table directory.
431    * @return An instance of MobRegionDirChecker.
432    */
433   private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
434     TableName tableName = FSUtils.getTableName(tableDir);
435     Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
436     return new MobRegionDirChecker(mobDir);
437   }
438
439   /**
440    * Check the specified table dirs for bad hfiles.
441    */
442   public void checkTables(Collection<Path> tables) throws IOException {
443     for (Path t : tables) {
444       checkTableDir(t);
445     }
446   }
447
448   /**
449    * @return the set of check failure file paths after checkTables is called.
450    */
451   public Collection<Path> getFailures() {
452     return new HashSet<Path>(failures);
453   }
454
455   /**
456    * @return the set of corrupted file paths after checkTables is called.
457    */
458   public Collection<Path> getCorrupted() {
459     return new HashSet<Path>(corrupted);
460   }
461
462   /**
463    * @return number of hfiles checked in the last HfileCorruptionChecker run
464    */
465   public int getHFilesChecked() {
466     return hfilesChecked.get();
467   }
468
469   /**
470    * @return the set of successfully quarantined paths after checkTables is called.
471    */
472   public Collection<Path> getQuarantined() {
473     return new HashSet<Path>(quarantined);
474   }
475
476   /**
477    * @return the set of paths that were missing.  Likely due to deletion/moves from
478    *  compaction or flushes.
479    */
480   public Collection<Path> getMissing() {
481     return new HashSet<Path>(missing);
482   }
483
484   /**
485    * @return the set of check failure mob file paths after checkTables is called.
486    */
487   public Collection<Path> getFailureMobFiles() {
488     return new HashSet<Path>(failureMobFiles);
489   }
490
491   /**
492    * @return the set of corrupted mob file paths after checkTables is called.
493    */
494   public Collection<Path> getCorruptedMobFiles() {
495     return new HashSet<Path>(corruptedMobFiles);
496   }
497
498   /**
499    * @return number of mob files checked in the last HfileCorruptionChecker run
500    */
501   public int getMobFilesChecked() {
502     return mobFilesChecked.get();
503   }
504
505   /**
506    * @return the set of successfully quarantined paths after checkTables is called.
507    */
508   public Collection<Path> getQuarantinedMobFiles() {
509     return new HashSet<Path>(quarantinedMobFiles);
510   }
511
512   /**
513    * @return the set of paths that were missing.  Likely due to table deletion or
514    *  deletion/moves from compaction.
515    */
516   public Collection<Path> getMissedMobFiles() {
517     return new HashSet<Path>(missedMobFiles);
518   }
519
520   /**
521    * Print a human readable summary of hfile quarantining operations.
522    * @param out
523    */
524   public void report(ErrorReporter out) {
525     out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
526     out.print("  HFiles corrupted:                  " + corrupted.size());
527     if (inQuarantineMode) {
528       out.print("    HFiles successfully quarantined: " + quarantined.size());
529       for (Path sq : quarantined) {
530         out.print("      " + sq);
531       }
532       out.print("    HFiles failed quarantine:        " + failures.size());
533       for (Path fq : failures) {
534         out.print("      " + fq);
535       }
536     }
537     out.print("    HFiles moved while checking:     " + missing.size());
538     for (Path mq : missing) {
539       out.print("      " + mq);
540     }
541 
542     String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
543     String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
544         : "CORRUPTED";
545
546     // print mob-related report
547     if (inQuarantineMode) {
548       out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
549       for (Path sq : quarantinedMobFiles) {
550         out.print("      " + sq);
551       }
552       out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
553       for (Path fq : failureMobFiles) {
554         out.print("      " + fq);
555       }
556     }
557     out.print("    Mob files moved while checking:     " + missedMobFiles.size());
558     for (Path mq : missedMobFiles) {
559       out.print("      " + mq);
560     }
561     String initialMobState = (corruptedMobFiles.size() == 0) ? "OK" : "CORRUPTED";
562     String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
563         : "CORRUPTED";
564
565     if (inQuarantineMode) {
566       out.print("Summary: " + initialState + " => " + fixedState);
567       out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
568     } else {
569       out.print("Summary: " + initialState);
570       out.print("Mob summary: " + initialMobState);
571     }
572   }
573 }