View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util.hbck;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentSkipListSet;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
42  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
43  import org.apache.hadoop.hbase.io.hfile.HFile;
44  import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
45  import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
46  import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
47  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
48  
49  /**
50   * This class marches through all of the region's hfiles and verifies that
51   * they are all valid files. One just needs to instantiate the class, use
52   * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
53   * quarantined files if in quarantining mode)
54   *
55   * The implementation currently parallelizes at the regionDir level.
56   */
57  public class HFileCorruptionChecker {
58    private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
59  
60    final Configuration conf;
61    final FileSystem fs;
62    final CacheConfig cacheConf;
63    final ExecutorService executor;
64    final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
65    final Set<Path> failures = new ConcurrentSkipListSet<Path>();
66    final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
67    final Set<Path> missing = new ConcurrentSkipListSet<Path>();
68    final boolean inQuarantineMode;
69    final AtomicInteger hfilesChecked = new AtomicInteger();
70  
71    public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
72        boolean quarantine) throws IOException {
73      this.conf = conf;
74      this.fs = FileSystem.get(conf);
75      this.cacheConf = new CacheConfig(conf);
76      this.executor = executor;
77      this.inQuarantineMode = quarantine;
78    }
79  
80    /**
81     * Checks a path to see if it is a valid hfile.
82     *
83     * @param p
84     *          full Path to an HFile
85     * @throws IOException
86     *           This is a connectivity related exception
87     */
88    protected void checkHFile(Path p) throws IOException {
89      HFile.Reader r = null;
90      try {
91        r = HFile.createReader(fs, p, cacheConf);
92      } catch (CorruptHFileException che) {
93        LOG.warn("Found corrupt HFile " + p, che);
94        corrupted.add(p);
95        if (inQuarantineMode) {
96          Path dest = createQuarantinePath(p);
97          LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
98          boolean success = fs.mkdirs(dest.getParent());
99          success = success ? fs.rename(p, dest): false;
100         if (!success) {
101           failures.add(p);
102         } else {
103           quarantined.add(dest);
104         }
105       }
106       return;
107     } catch (FileNotFoundException fnfe) {
108       LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
109       missing.add(p);
110     } finally {
111       hfilesChecked.addAndGet(1);
112       if (r != null) {
113         r.close(true);
114       }
115     }
116   }
117 
118   /**
119    * Given a path, generates a new path to where we move a corrupted hfile (bad
120    * trailer, no trailer).
121    *
122    * @param hFile
123    *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
124    *          /region/cf/file)
125    * @return path to where corrupted files are stored. This should be
126    *         HBASE_DIR/.corrupt/table/region/cf/file.
127    */
128   Path createQuarantinePath(Path hFile) {
129     // extract the normal dirs structure
130     Path cfDir = hFile.getParent();
131     Path regionDir = cfDir.getParent();
132     Path tableDir = regionDir.getParent();
133 
134     // build up the corrupted dirs strcture
135     Path corruptBaseDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
136         "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
137     Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
138     Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
139     Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
140     Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
141     return corruptHfile;
142   }
143 
144   /**
145    * Check all files in a column family dir.
146    *
147    * @param cfDir
148    *          column family directory
149    * @throws IOException
150    */
151   protected void checkColFamDir(Path cfDir) throws IOException {
152     FileStatus[] hfs = null;
153     try {
154       hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
155     } catch (FileNotFoundException fnfe) {
156       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
157       LOG.warn("Colfam Directory " + cfDir +
158           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
159       missing.add(cfDir);
160       return;
161     }
162 
163     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
164     if (hfs.length == 0 && !fs.exists(cfDir)) {
165       LOG.warn("Colfam Directory " + cfDir +
166           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
167       missing.add(cfDir);
168       return;
169     }
170     for (FileStatus hfFs : hfs) {
171       Path hf = hfFs.getPath();
172       checkHFile(hf);
173     }
174   }
175 
176   /**
177    * Check all column families in a region dir.
178    *
179    * @param regionDir
180    *          region directory
181    * @throws IOException
182    */
183   protected void checkRegionDir(Path regionDir) throws IOException {
184     FileStatus[] cfs = null;
185     try {
186       cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
187     } catch (FileNotFoundException fnfe) {
188       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
189       LOG.warn("Region Directory " + regionDir +
190           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
191       missing.add(regionDir);
192       return;
193     }
194 
195     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
196     if (cfs.length == 0 && !fs.exists(regionDir)) {
197       LOG.warn("Region Directory " + regionDir +
198           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
199       missing.add(regionDir);
200       return;
201     }
202 
203     for (FileStatus cfFs : cfs) {
204       Path cfDir = cfFs.getPath();
205       checkColFamDir(cfDir);
206     }
207   }
208 
209   /**
210    * Check all the regiondirs in the specified tableDir
211    *
212    * @param tableDir
213    *          path to a table
214    * @throws IOException
215    */
216   void checkTableDir(Path tableDir) throws IOException {
217     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
218     if (rds.length == 0 && !fs.exists(tableDir)) {
219       // interestingly listStatus does not throw an exception if the path does not exist.
220       LOG.warn("Table Directory " + tableDir +
221           " does not exist.  Likely due to concurrent delete. Skipping.");
222       missing.add(tableDir);
223       return;
224     }
225 
226     // Parallelize check at the region dir level
227     List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
228     List<Future<Void>> rdFutures;
229 
230     for (FileStatus rdFs : rds) {
231       Path rdDir = rdFs.getPath();
232       RegionDirChecker work = new RegionDirChecker(rdDir);
233       rdcs.add(work);
234     }
235 
236     // Submit and wait for completion
237     try {
238       rdFutures = executor.invokeAll(rdcs);
239     } catch (InterruptedException ie) {
240       Thread.currentThread().interrupt();
241       LOG.warn("Region dirs checking interrupted!", ie);
242       return;
243     }
244 
245     for (int i = 0; i < rdFutures.size(); i++) {
246       Future<Void> f = rdFutures.get(i);
247       try {
248         f.get();
249       } catch (ExecutionException e) {
250         LOG.warn("Failed to quaratine an HFile in regiondir "
251             + rdcs.get(i).regionDir, e.getCause());
252         // rethrow IOExceptions
253         if (e.getCause() instanceof IOException) {
254           throw (IOException) e.getCause();
255         }
256 
257         // rethrow RuntimeExceptions
258         if (e.getCause() instanceof RuntimeException) {
259           throw (RuntimeException) e.getCause();
260         }
261 
262         // this should never happen
263         LOG.error("Unexpected exception encountered", e);
264         return; // bailing out.
265       } catch (InterruptedException ie) {
266         Thread.currentThread().interrupt();
267         LOG.warn("Region dirs check interrupted!", ie);
268         // bailing out
269         return;
270       }
271     }
272   }
273 
274   /**
275    * An individual work item for parallelized regiondir processing. This is
276    * intentionally an inner class so it can use the shared error sets and fs.
277    */
278   private class RegionDirChecker implements Callable<Void> {
279     final Path regionDir;
280 
281     RegionDirChecker(Path regionDir) {
282       this.regionDir = regionDir;
283     }
284 
285     @Override
286     public Void call() throws IOException {
287       checkRegionDir(regionDir);
288       return null;
289     }
290   }
291 
292   /**
293    * Check the specified table dirs for bad hfiles.
294    */
295   public void checkTables(Collection<Path> tables) throws IOException {
296     for (Path t : tables) {
297       checkTableDir(t);
298     }
299   }
300 
301   /**
302    * @return the set of check failure file paths after checkTables is called.
303    */
304   public Collection<Path> getFailures() {
305     return new HashSet<Path>(failures);
306   }
307 
308   /**
309    * @return the set of corrupted file paths after checkTables is called.
310    */
311   public Collection<Path> getCorrupted() {
312     return new HashSet<Path>(corrupted);
313   }
314 
315   /**
316    * @return number of hfiles checked in the last HfileCorruptionChecker run
317    */
318   public int getHFilesChecked() {
319     return hfilesChecked.get();
320   }
321 
322   /**
323    * @return the set of successfully quarantined paths after checkTables is called.
324    */
325   public Collection<Path> getQuarantined() {
326     return new HashSet<Path>(quarantined);
327   }
328 
329   /**
330    * @return the set of paths that were missing.  Likely due to deletion/moves from
331    *  compaction or flushes.
332    */
333   public Collection<Path> getMissing() {
334     return new HashSet<Path>(missing);
335   }
336 
337   /**
338    * Print a human readable summary of hfile quarantining operations.
339    * @param out
340    */
341   public void report(ErrorReporter out) {
342     out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
343     out.print("  HFiles corrupted:                  " + corrupted.size());
344     if (inQuarantineMode) {
345       out.print("    HFiles successfully quarantined: " + quarantined.size());
346       for (Path sq : quarantined) {
347         out.print("      " + sq);
348       }
349       out.print("    HFiles failed quarantine:        " + failures.size());
350       for (Path fq : failures) {
351         out.print("      " + fq);
352       }
353     }
354     out.print("    HFiles moved while checking:     " + missing.size());
355     for (Path mq : missing) {
356       out.print("      " + mq);
357     }
358 
359     String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
360     String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
361         : "CORRUPTED";
362 
363     if (inQuarantineMode) {
364       out.print("Summary: " + initialState + " => " + fixedState);
365     } else {
366       out.print("Summary: " + initialState);
367     }
368   }
369 }