View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util.hbck;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentSkipListSet;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
44  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
45  import org.apache.hadoop.hbase.io.hfile.HFile;
46  import org.apache.hadoop.hbase.mob.MobUtils;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
49  import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
50  import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
51  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
52  
53  /**
54   * This class marches through all of the region's hfiles and verifies that
55   * they are all valid files. One just needs to instantiate the class, use
56   * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
57   * quarantined files if in quarantining mode)
58   *
59   * The implementation currently parallelizes at the regionDir level.
60   */
61  @InterfaceAudience.Private
62  public class HFileCorruptionChecker {
63    private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
64  
65    final Configuration conf;
66    final FileSystem fs;
67    final CacheConfig cacheConf;
68    final ExecutorService executor;
69    final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
70    final Set<Path> failures = new ConcurrentSkipListSet<Path>();
71    final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
72    final Set<Path> missing = new ConcurrentSkipListSet<Path>();
73    final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<Path>();
74    final Set<Path> failureMobFiles = new ConcurrentSkipListSet<Path>();
75    final Set<Path> missedMobFiles = new ConcurrentSkipListSet<Path>();
76    final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<Path>();
77    final boolean inQuarantineMode;
78    final AtomicInteger hfilesChecked = new AtomicInteger();
79    final AtomicInteger mobFilesChecked = new AtomicInteger();
80  
81    public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
82        boolean quarantine) throws IOException {
83      this.conf = conf;
84      this.fs = FileSystem.get(conf);
85      this.cacheConf = new CacheConfig(conf);
86      this.executor = executor;
87      this.inQuarantineMode = quarantine;
88    }
89  
90    /**
91     * Checks a path to see if it is a valid hfile.
92     *
93     * @param p
94     *          full Path to an HFile
95     * @throws IOException
96     *           This is a connectivity related exception
97     */
98    protected void checkHFile(Path p) throws IOException {
99      HFile.Reader r = null;
100     try {
101       r = HFile.createReader(fs, p, cacheConf, conf);
102     } catch (CorruptHFileException che) {
103       LOG.warn("Found corrupt HFile " + p, che);
104       corrupted.add(p);
105       if (inQuarantineMode) {
106         Path dest = createQuarantinePath(p);
107         LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
108         boolean success = fs.mkdirs(dest.getParent());
109         success = success ? fs.rename(p, dest): false;
110         if (!success) {
111           failures.add(p);
112         } else {
113           quarantined.add(dest);
114         }
115       }
116       return;
117     } catch (FileNotFoundException fnfe) {
118       LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
119       missing.add(p);
120     } finally {
121       hfilesChecked.addAndGet(1);
122       if (r != null) {
123         r.close(true);
124       }
125     }
126   }
127 
128   /**
129    * Given a path, generates a new path to where we move a corrupted hfile (bad
130    * trailer, no trailer).
131    *
132    * @param hFile
133    *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
134    *          /region/cf/file)
135    * @return path to where corrupted files are stored. This should be
136    *         HBASE_DIR/.corrupt/table/region/cf/file.
137    */
138   Path createQuarantinePath(Path hFile) throws IOException {
139     // extract the normal dirs structure
140     Path cfDir = hFile.getParent();
141     Path regionDir = cfDir.getParent();
142     Path tableDir = regionDir.getParent();
143 
144     // build up the corrupted dirs structure
145     Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
146     if (conf.get("hbase.hfile.quarantine.dir") != null) {
147       LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir);
148     }
149     Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
150     Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
151     Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
152     Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
153     return corruptHfile;
154   }
155
156   /**
157    * Check all files in a column family dir.
158    *
159    * @param cfDir
160    *          column family directory
161    * @throws IOException
162    */
163   protected void checkColFamDir(Path cfDir) throws IOException {
164     FileStatus[] statuses = null;
165     try {
166       statuses = fs.listStatus(cfDir); // use same filter as scanner.
167     } catch (FileNotFoundException fnfe) {
168       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
169       LOG.warn("Colfam Directory " + cfDir +
170           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
171       missing.add(cfDir);
172       return;
173     }
174
175     List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
176     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
177     if (hfs.size() == 0 && !fs.exists(cfDir)) {
178       LOG.warn("Colfam Directory " + cfDir +
179           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
180       missing.add(cfDir);
181       return;
182     }
183     for (FileStatus hfFs : hfs) {
184       Path hf = hfFs.getPath();
185       checkHFile(hf);
186     }
187   }
188
189   /**
190    * Check all files in a mob column family dir.
191    *
192    * @param cfDir
193    *          mob column family directory
194    * @throws IOException
195    */
196   protected void checkMobColFamDir(Path cfDir) throws IOException {
197     FileStatus[] statuses = null;
198     try {
199       statuses = fs.listStatus(cfDir); // use same filter as scanner.
200     } catch (FileNotFoundException fnfe) {
201       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
202       LOG.warn("Mob colfam Directory " + cfDir +
203           " does not exist.  Likely the table is deleted. Skipping.");
204       missedMobFiles.add(cfDir);
205       return;
206     }
207
208     List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
209     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
210     if (hfs.size() == 0 && !fs.exists(cfDir)) {
211       LOG.warn("Mob colfam Directory " + cfDir +
212           " does not exist.  Likely the table is deleted. Skipping.");
213       missedMobFiles.add(cfDir);
214       return;
215     }
216     for (FileStatus hfFs : hfs) {
217       Path hf = hfFs.getPath();
218       checkMobFile(hf);
219     }
220   }
221
222   /**
223    * Checks a path to see if it is a valid mob file.
224    *
225    * @param p
226    *          full Path to a mob file.
227    * @throws IOException
228    *           This is a connectivity related exception
229    */
230   protected void checkMobFile(Path p) throws IOException {
231     HFile.Reader r = null;
232     try {
233       r = HFile.createReader(fs, p, cacheConf, conf);
234     } catch (CorruptHFileException che) {
235       LOG.warn("Found corrupt mob file " + p, che);
236       corruptedMobFiles.add(p);
237       if (inQuarantineMode) {
238         Path dest = createQuarantinePath(p);
239         LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
240         boolean success = fs.mkdirs(dest.getParent());
241         success = success ? fs.rename(p, dest): false;
242         if (!success) {
243           failureMobFiles.add(p);
244         } else {
245           quarantinedMobFiles.add(dest);
246         }
247       }
248       return;
249     } catch (FileNotFoundException fnfe) {
250       LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
251       missedMobFiles.add(p);
252     } finally {
253       mobFilesChecked.addAndGet(1);
254       if (r != null) {
255         r.close(true);
256       }
257     }
258   }
259
260   /**
261    * Checks all the mob files of a table.
262    * @param regionDir The mob region directory
263    * @throws IOException
264    */
265   private void checkMobRegionDir(Path regionDir) throws IOException {
266     if (!fs.exists(regionDir)) {
267       return;
268     }
269     FileStatus[] hfs = null;
270     try {
271       hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
272     } catch (FileNotFoundException fnfe) {
273       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
274       LOG.warn("Mob directory " + regionDir
275         + " does not exist.  Likely the table is deleted. Skipping.");
276       missedMobFiles.add(regionDir);
277       return;
278     }
279
280     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
281     if (hfs.length == 0 && !fs.exists(regionDir)) {
282       LOG.warn("Mob directory " + regionDir
283         + " does not exist.  Likely the table is deleted. Skipping.");
284       missedMobFiles.add(regionDir);
285       return;
286     }
287     for (FileStatus hfFs : hfs) {
288       Path hf = hfFs.getPath();
289       checkMobColFamDir(hf);
290     }
291   }
292
293   /**
294    * Check all column families in a region dir.
295    *
296    * @param regionDir
297    *          region directory
298    * @throws IOException
299    */
300   protected void checkRegionDir(Path regionDir) throws IOException {
301     FileStatus[] statuses = null;
302     try {
303       statuses = fs.listStatus(regionDir);
304     } catch (FileNotFoundException fnfe) {
305       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
306       LOG.warn("Region Directory " + regionDir +
307           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
308       missing.add(regionDir);
309       return;
310     }
311
312     List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
313     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
314     if (cfs.size() == 0 && !fs.exists(regionDir)) {
315       LOG.warn("Region Directory " + regionDir +
316           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
317       missing.add(regionDir);
318       return;
319     }
320
321     for (FileStatus cfFs : cfs) {
322       Path cfDir = cfFs.getPath();
323       checkColFamDir(cfDir);
324     }
325   }
326
327   /**
328    * Check all the regiondirs in the specified tableDir
329    *
330    * @param tableDir
331    *          path to a table
332    * @throws IOException
333    */
334   void checkTableDir(Path tableDir) throws IOException {
335     List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
336     if (rds == null) {
337       if (!fs.exists(tableDir)) {
338         LOG.warn("Table Directory " + tableDir +
339             " does not exist.  Likely due to concurrent delete. Skipping.");
340         missing.add(tableDir);
341       }
342       return;
343     }
344
345     // Parallelize check at the region dir level
346     List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
347     List<Future<Void>> rdFutures;
348
349     for (FileStatus rdFs : rds) {
350       Path rdDir = rdFs.getPath();
351       RegionDirChecker work = new RegionDirChecker(rdDir);
352       rdcs.add(work);
353     }
354
355     // add mob region
356     rdcs.add(createMobRegionDirChecker(tableDir));
357     // Submit and wait for completion
358     try {
359       rdFutures = executor.invokeAll(rdcs);
360     } catch (InterruptedException ie) {
361       Thread.currentThread().interrupt();
362       LOG.warn("Region dirs checking interrupted!", ie);
363       return;
364     }
365
366     for (int i = 0; i < rdFutures.size(); i++) {
367       Future<Void> f = rdFutures.get(i);
368       try {
369         f.get();
370       } catch (ExecutionException e) {
371         LOG.warn("Failed to quaratine an HFile in regiondir "
372             + rdcs.get(i).regionDir, e.getCause());
373         // rethrow IOExceptions
374         if (e.getCause() instanceof IOException) {
375           throw (IOException) e.getCause();
376         }
377
378         // rethrow RuntimeExceptions
379         if (e.getCause() instanceof RuntimeException) {
380           throw (RuntimeException) e.getCause();
381         }
382
383         // this should never happen
384         LOG.error("Unexpected exception encountered", e);
385         return; // bailing out.
386       } catch (InterruptedException ie) {
387         Thread.currentThread().interrupt();
388         LOG.warn("Region dirs check interrupted!", ie);
389         // bailing out
390         return;
391       }
392     }
393   }
394
395   /**
396    * An individual work item for parallelized regiondir processing. This is
397    * intentionally an inner class so it can use the shared error sets and fs.
398    */
399   private class RegionDirChecker implements Callable<Void> {
400     final Path regionDir;
401
402     RegionDirChecker(Path regionDir) {
403       this.regionDir = regionDir;
404     }
405
406     @Override
407     public Void call() throws IOException {
408       checkRegionDir(regionDir);
409       return null;
410     }
411   }
412
413   /**
414    * An individual work item for parallelized mob dir processing. This is
415    * intentionally an inner class so it can use the shared error sets and fs.
416    */
417   private class MobRegionDirChecker extends RegionDirChecker {
418
419     MobRegionDirChecker(Path regionDir) {
420       super(regionDir);
421     }
422
423     @Override
424     public Void call() throws IOException {
425       checkMobRegionDir(regionDir);
426       return null;
427     }
428   }
429
430   /**
431    * Creates an instance of MobRegionDirChecker.
432    * @param tableDir The current table directory.
433    * @return An instance of MobRegionDirChecker.
434    */
435   private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
436     TableName tableName = FSUtils.getTableName(tableDir);
437     Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
438     return new MobRegionDirChecker(mobDir);
439   }
440
441   /**
442    * Check the specified table dirs for bad hfiles.
443    */
444   public void checkTables(Collection<Path> tables) throws IOException {
445     for (Path t : tables) {
446       checkTableDir(t);
447     }
448   }
449
450   /**
451    * @return the set of check failure file paths after checkTables is called.
452    */
453   public Collection<Path> getFailures() {
454     return new HashSet<Path>(failures);
455   }
456
457   /**
458    * @return the set of corrupted file paths after checkTables is called.
459    */
460   public Collection<Path> getCorrupted() {
461     return new HashSet<Path>(corrupted);
462   }
463
464   /**
465    * @return number of hfiles checked in the last HfileCorruptionChecker run
466    */
467   public int getHFilesChecked() {
468     return hfilesChecked.get();
469   }
470
471   /**
472    * @return the set of successfully quarantined paths after checkTables is called.
473    */
474   public Collection<Path> getQuarantined() {
475     return new HashSet<Path>(quarantined);
476   }
477
478   /**
479    * @return the set of paths that were missing.  Likely due to deletion/moves from
480    *  compaction or flushes.
481    */
482   public Collection<Path> getMissing() {
483     return new HashSet<Path>(missing);
484   }
485
486   /**
487    * @return the set of check failure mob file paths after checkTables is called.
488    */
489   public Collection<Path> getFailureMobFiles() {
490     return new HashSet<Path>(failureMobFiles);
491   }
492
493   /**
494    * @return the set of corrupted mob file paths after checkTables is called.
495    */
496   public Collection<Path> getCorruptedMobFiles() {
497     return new HashSet<Path>(corruptedMobFiles);
498   }
499
500   /**
501    * @return number of mob files checked in the last HfileCorruptionChecker run
502    */
503   public int getMobFilesChecked() {
504     return mobFilesChecked.get();
505   }
506
507   /**
508    * @return the set of successfully quarantined paths after checkTables is called.
509    */
510   public Collection<Path> getQuarantinedMobFiles() {
511     return new HashSet<Path>(quarantinedMobFiles);
512   }
513
514   /**
515    * @return the set of paths that were missing.  Likely due to table deletion or
516    *  deletion/moves from compaction.
517    */
518   public Collection<Path> getMissedMobFiles() {
519     return new HashSet<Path>(missedMobFiles);
520   }
521
522   /**
523    * Print a human readable summary of hfile quarantining operations.
524    * @param out
525    */
526   public void report(ErrorReporter out) {
527     out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
528     out.print("  HFiles corrupted:                  " + corrupted.size());
529     if (inQuarantineMode) {
530       out.print("    HFiles successfully quarantined: " + quarantined.size());
531       for (Path sq : quarantined) {
532         out.print("      " + sq);
533       }
534       out.print("    HFiles failed quarantine:        " + failures.size());
535       for (Path fq : failures) {
536         out.print("      " + fq);
537       }
538     }
539     out.print("    HFiles moved while checking:     " + missing.size());
540     for (Path mq : missing) {
541       out.print("      " + mq);
542     }
543
544     String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
545     String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
546         : "CORRUPTED";
547
548     // print mob-related report
549     if (inQuarantineMode) {
550       out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
551       for (Path sq : quarantinedMobFiles) {
552         out.print("      " + sq);
553       }
554       out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
555       for (Path fq : failureMobFiles) {
556         out.print("      " + fq);
557       }
558     }
559     out.print("    Mob files moved while checking:     " + missedMobFiles.size());
560     for (Path mq : missedMobFiles) {
561       out.print("      " + mq);
562     }
563     String initialMobState = (corruptedMobFiles.size() == 0) ? "OK" : "CORRUPTED";
564     String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
565         : "CORRUPTED";
566
567     if (inQuarantineMode) {
568       out.print("Summary: " + initialState + " => " + fixedState);
569       out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
570     } else {
571       out.print("Summary: " + initialState);
572       out.print("Mob summary: " + initialMobState);
573     }
574   }
575 }