View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util.hbck;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentSkipListSet;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
44  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
45  import org.apache.hadoop.hbase.io.hfile.HFile;
46  import org.apache.hadoop.hbase.mob.MobUtils;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
49  import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
50  import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
51  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
52  
53  /**
54   * This class marches through all of the region's hfiles and verifies that
55   * they are all valid files. One just needs to instantiate the class, use
56   * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
57   * quarantined files if in quarantining mode)
58   *
59   * The implementation currently parallelizes at the regionDir level.
60   */
61  @InterfaceAudience.Private
62  public class HFileCorruptionChecker {
63    private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
64  
65    final Configuration conf;
66    final FileSystem fs;
67    final CacheConfig cacheConf;
68    final ExecutorService executor;
69    final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
70    final Set<Path> failures = new ConcurrentSkipListSet<Path>();
71    final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
72    final Set<Path> missing = new ConcurrentSkipListSet<Path>();
73    final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<Path>();
74    final Set<Path> failureMobFiles = new ConcurrentSkipListSet<Path>();
75    final Set<Path> missedMobFiles = new ConcurrentSkipListSet<Path>();
76    final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<Path>();
77    final boolean inQuarantineMode;
78    final AtomicInteger hfilesChecked = new AtomicInteger();
79    final AtomicInteger mobFilesChecked = new AtomicInteger();
80  
81    public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
82        boolean quarantine) throws IOException {
83      this.conf = conf;
84      this.fs = FileSystem.get(conf);
85      this.cacheConf = new CacheConfig(conf);
86      this.executor = executor;
87      this.inQuarantineMode = quarantine;
88    }
89  
90    /**
91     * Checks a path to see if it is a valid hfile.
92     *
93     * @param p
94     *          full Path to an HFile
95     * @throws IOException
96     *           This is a connectivity related exception
97     */
98    protected void checkHFile(Path p) throws IOException {
99      HFile.Reader r = null;
100     try {
101       r = HFile.createReader(fs, p, cacheConf, conf);
102     } catch (CorruptHFileException che) {
103       LOG.warn("Found corrupt HFile " + p, che);
104       corrupted.add(p);
105       if (inQuarantineMode) {
106         Path dest = createQuarantinePath(p);
107         LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
108         boolean success = fs.mkdirs(dest.getParent());
109         success = success ? fs.rename(p, dest): false;
110         if (!success) {
111           failures.add(p);
112         } else {
113           quarantined.add(dest);
114         }
115       }
116       return;
117     } catch (FileNotFoundException fnfe) {
118       LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
119       missing.add(p);
120     } finally {
121       hfilesChecked.addAndGet(1);
122       if (r != null) {
123         r.close(true);
124       }
125     }
126   }
127 
128   /**
129    * Given a path, generates a new path to where we move a corrupted hfile (bad
130    * trailer, no trailer).
131    *
132    * @param hFile
133    *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
134    *          /region/cf/file)
135    * @return path to where corrupted files are stored. This should be
136    *         HBASE_DIR/.corrupt/table/region/cf/file.
137    */
138   Path createQuarantinePath(Path hFile) throws IOException {
139     // extract the normal dirs structure
140     Path cfDir = hFile.getParent();
141     Path regionDir = cfDir.getParent();
142     Path tableDir = regionDir.getParent();
143 
144     // build up the corrupted dirs strcture
145     Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), conf.get(
146         "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
147     Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
148     Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
149     Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
150     Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
151     return corruptHfile;
152   }
153 
154   /**
155    * Check all files in a column family dir.
156    *
157    * @param cfDir
158    *          column family directory
159    * @throws IOException
160    */
161   protected void checkColFamDir(Path cfDir) throws IOException {
162     FileStatus[] hfs = null;
163     try {
164       hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
165     } catch (FileNotFoundException fnfe) {
166       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
167       LOG.warn("Colfam Directory " + cfDir +
168           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
169       missing.add(cfDir);
170       return;
171     }
172 
173     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
174     if (hfs.length == 0 && !fs.exists(cfDir)) {
175       LOG.warn("Colfam Directory " + cfDir +
176           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
177       missing.add(cfDir);
178       return;
179     }
180     for (FileStatus hfFs : hfs) {
181       Path hf = hfFs.getPath();
182       checkHFile(hf);
183     }
184   }
185 
186   /**
187    * Check all files in a mob column family dir.
188    *
189    * @param cfDir
190    *          mob column family directory
191    * @throws IOException
192    */
193   protected void checkMobColFamDir(Path cfDir) throws IOException {
194     FileStatus[] hfs = null;
195     try {
196       hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
197     } catch (FileNotFoundException fnfe) {
198       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
199       LOG.warn("Mob colfam Directory " + cfDir +
200           " does not exist.  Likely the table is deleted. Skipping.");
201       missedMobFiles.add(cfDir);
202       return;
203     }
204 
205     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
206     if (hfs.length == 0 && !fs.exists(cfDir)) {
207       LOG.warn("Mob colfam Directory " + cfDir +
208           " does not exist.  Likely the table is deleted. Skipping.");
209       missedMobFiles.add(cfDir);
210       return;
211     }
212     for (FileStatus hfFs : hfs) {
213       Path hf = hfFs.getPath();
214       checkMobFile(hf);
215     }
216   }
217 
218   /**
219    * Checks a path to see if it is a valid mob file.
220    *
221    * @param p
222    *          full Path to a mob file.
223    * @throws IOException
224    *           This is a connectivity related exception
225    */
226   protected void checkMobFile(Path p) throws IOException {
227     HFile.Reader r = null;
228     try {
229       r = HFile.createReader(fs, p, cacheConf, conf);
230     } catch (CorruptHFileException che) {
231       LOG.warn("Found corrupt mob file " + p, che);
232       corruptedMobFiles.add(p);
233       if (inQuarantineMode) {
234         Path dest = createQuarantinePath(p);
235         LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
236         boolean success = fs.mkdirs(dest.getParent());
237         success = success ? fs.rename(p, dest): false;
238         if (!success) {
239           failureMobFiles.add(p);
240         } else {
241           quarantinedMobFiles.add(dest);
242         }
243       }
244       return;
245     } catch (FileNotFoundException fnfe) {
246       LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
247       missedMobFiles.add(p);
248     } finally {
249       mobFilesChecked.addAndGet(1);
250       if (r != null) {
251         r.close(true);
252       }
253     }
254   }
255 
256   /**
257    * Checks all the mob files of a table.
258    * @param regionDir The mob region directory
259    * @throws IOException
260    */
261   private void checkMobRegionDir(Path regionDir) throws IOException {
262     if (!fs.exists(regionDir)) {
263       return;
264     }
265     FileStatus[] hfs = null;
266     try {
267       hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
268     } catch (FileNotFoundException fnfe) {
269       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
270       LOG.warn("Mob directory " + regionDir
271         + " does not exist.  Likely the table is deleted. Skipping.");
272       missedMobFiles.add(regionDir);
273       return;
274     }
275 
276     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
277     if (hfs.length == 0 && !fs.exists(regionDir)) {
278       LOG.warn("Mob directory " + regionDir
279         + " does not exist.  Likely the table is deleted. Skipping.");
280       missedMobFiles.add(regionDir);
281       return;
282     }
283     for (FileStatus hfFs : hfs) {
284       Path hf = hfFs.getPath();
285       checkMobColFamDir(hf);
286     }
287   }
288 
289   /**
290    * Check all column families in a region dir.
291    *
292    * @param regionDir
293    *          region directory
294    * @throws IOException
295    */
296   protected void checkRegionDir(Path regionDir) throws IOException {
297     FileStatus[] cfs = null;
298     try {
299       cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
300     } catch (FileNotFoundException fnfe) {
301       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
302       LOG.warn("Region Directory " + regionDir +
303           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
304       missing.add(regionDir);
305       return;
306     }
307 
308     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
309     if (cfs.length == 0 && !fs.exists(regionDir)) {
310       LOG.warn("Region Directory " + regionDir +
311           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
312       missing.add(regionDir);
313       return;
314     }
315 
316     for (FileStatus cfFs : cfs) {
317       Path cfDir = cfFs.getPath();
318       checkColFamDir(cfDir);
319     }
320   }
321 
322   /**
323    * Check all the regiondirs in the specified tableDir
324    *
325    * @param tableDir
326    *          path to a table
327    * @throws IOException
328    */
329   void checkTableDir(Path tableDir) throws IOException {
330     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
331     if (rds.length == 0 && !fs.exists(tableDir)) {
332       // interestingly listStatus does not throw an exception if the path does not exist.
333       LOG.warn("Table Directory " + tableDir +
334           " does not exist.  Likely due to concurrent delete. Skipping.");
335       missing.add(tableDir);
336       return;
337     }
338 
339     // Parallelize check at the region dir level
340     List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
341     List<Future<Void>> rdFutures;
342 
343     for (FileStatus rdFs : rds) {
344       Path rdDir = rdFs.getPath();
345       RegionDirChecker work = new RegionDirChecker(rdDir);
346       rdcs.add(work);
347     }
348 
349     // add mob region
350     rdcs.add(createMobRegionDirChecker(tableDir));
351     // Submit and wait for completion
352     try {
353       rdFutures = executor.invokeAll(rdcs);
354     } catch (InterruptedException ie) {
355       Thread.currentThread().interrupt();
356       LOG.warn("Region dirs checking interrupted!", ie);
357       return;
358     }
359 
360     for (int i = 0; i < rdFutures.size(); i++) {
361       Future<Void> f = rdFutures.get(i);
362       try {
363         f.get();
364       } catch (ExecutionException e) {
365         LOG.warn("Failed to quaratine an HFile in regiondir "
366             + rdcs.get(i).regionDir, e.getCause());
367         // rethrow IOExceptions
368         if (e.getCause() instanceof IOException) {
369           throw (IOException) e.getCause();
370         }
371 
372         // rethrow RuntimeExceptions
373         if (e.getCause() instanceof RuntimeException) {
374           throw (RuntimeException) e.getCause();
375         }
376 
377         // this should never happen
378         LOG.error("Unexpected exception encountered", e);
379         return; // bailing out.
380       } catch (InterruptedException ie) {
381         Thread.currentThread().interrupt();
382         LOG.warn("Region dirs check interrupted!", ie);
383         // bailing out
384         return;
385       }
386     }
387   }
388 
389   /**
390    * An individual work item for parallelized regiondir processing. This is
391    * intentionally an inner class so it can use the shared error sets and fs.
392    */
393   private class RegionDirChecker implements Callable<Void> {
394     final Path regionDir;
395 
396     RegionDirChecker(Path regionDir) {
397       this.regionDir = regionDir;
398     }
399 
400     @Override
401     public Void call() throws IOException {
402       checkRegionDir(regionDir);
403       return null;
404     }
405   }
406 
407   /**
408    * An individual work item for parallelized mob dir processing. This is
409    * intentionally an inner class so it can use the shared error sets and fs.
410    */
411   private class MobRegionDirChecker extends RegionDirChecker {
412 
413     MobRegionDirChecker(Path regionDir) {
414       super(regionDir);
415     }
416 
417     @Override
418     public Void call() throws IOException {
419       checkMobRegionDir(regionDir);
420       return null;
421     }
422   }
423 
424   /**
425    * Creates an instance of MobRegionDirChecker.
426    * @param tableDir The current table directory.
427    * @return An instance of MobRegionDirChecker.
428    */
429   private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
430     TableName tableName = FSUtils.getTableName(tableDir);
431     Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
432     return new MobRegionDirChecker(mobDir);
433   }
434 
435   /**
436    * Check the specified table dirs for bad hfiles.
437    */
438   public void checkTables(Collection<Path> tables) throws IOException {
439     for (Path t : tables) {
440       checkTableDir(t);
441     }
442   }
443 
444   /**
445    * @return the set of check failure file paths after checkTables is called.
446    */
447   public Collection<Path> getFailures() {
448     return new HashSet<Path>(failures);
449   }
450 
451   /**
452    * @return the set of corrupted file paths after checkTables is called.
453    */
454   public Collection<Path> getCorrupted() {
455     return new HashSet<Path>(corrupted);
456   }
457 
458   /**
459    * @return number of hfiles checked in the last HfileCorruptionChecker run
460    */
461   public int getHFilesChecked() {
462     return hfilesChecked.get();
463   }
464 
465   /**
466    * @return the set of successfully quarantined paths after checkTables is called.
467    */
468   public Collection<Path> getQuarantined() {
469     return new HashSet<Path>(quarantined);
470   }
471 
472   /**
473    * @return the set of paths that were missing.  Likely due to deletion/moves from
474    *  compaction or flushes.
475    */
476   public Collection<Path> getMissing() {
477     return new HashSet<Path>(missing);
478   }
479 
480   /**
481    * @return the set of check failure mob file paths after checkTables is called.
482    */
483   public Collection<Path> getFailureMobFiles() {
484     return new HashSet<Path>(failureMobFiles);
485   }
486 
487   /**
488    * @return the set of corrupted mob file paths after checkTables is called.
489    */
490   public Collection<Path> getCorruptedMobFiles() {
491     return new HashSet<Path>(corruptedMobFiles);
492   }
493 
494   /**
495    * @return number of mob files checked in the last HfileCorruptionChecker run
496    */
497   public int getMobFilesChecked() {
498     return mobFilesChecked.get();
499   }
500 
501   /**
502    * @return the set of successfully quarantined paths after checkTables is called.
503    */
504   public Collection<Path> getQuarantinedMobFiles() {
505     return new HashSet<Path>(quarantinedMobFiles);
506   }
507 
508   /**
509    * @return the set of paths that were missing.  Likely due to table deletion or
510    *  deletion/moves from compaction.
511    */
512   public Collection<Path> getMissedMobFiles() {
513     return new HashSet<Path>(missedMobFiles);
514   }
515 
516   /**
517    * Print a human readable summary of hfile quarantining operations.
518    * @param out
519    */
520   public void report(ErrorReporter out) {
521     out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
522     out.print("  HFiles corrupted:                  " + corrupted.size());
523     if (inQuarantineMode) {
524       out.print("    HFiles successfully quarantined: " + quarantined.size());
525       for (Path sq : quarantined) {
526         out.print("      " + sq);
527       }
528       out.print("    HFiles failed quarantine:        " + failures.size());
529       for (Path fq : failures) {
530         out.print("      " + fq);
531       }
532     }
533     out.print("    HFiles moved while checking:     " + missing.size());
534     for (Path mq : missing) {
535       out.print("      " + mq);
536     }
537 
538     String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
539     String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
540         : "CORRUPTED";
541 
542     // print mob-related report
543     if (inQuarantineMode) {
544       out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
545       for (Path sq : quarantinedMobFiles) {
546         out.print("      " + sq);
547       }
548       out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
549       for (Path fq : failureMobFiles) {
550         out.print("      " + fq);
551       }
552     }
553     out.print("    Mob files moved while checking:     " + missedMobFiles.size());
554     for (Path mq : missedMobFiles) {
555       out.print("      " + mq);
556     }
557     String initialMobState = (corruptedMobFiles.size() == 0) ? "OK" : "CORRUPTED";
558     String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
559         : "CORRUPTED";
560 
561     if (inQuarantineMode) {
562       out.print("Summary: " + initialState + " => " + fixedState);
563       out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
564     } else {
565       out.print("Summary: " + initialState);
566       out.print("Mob summary: " + initialMobState);
567     }
568   }
569 }