View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util.hbck;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentSkipListSet;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
43  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
44  import org.apache.hadoop.hbase.io.hfile.HFile;
45  import org.apache.hadoop.hbase.util.FSUtils;
46  import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
47  import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
48  import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
49  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
50  
51  /**
52   * This class marches through all of the region's hfiles and verifies that
53   * they are all valid files. One just needs to instantiate the class, use
54   * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
55   * quarantined files if in quarantining mode)
56   *
57   * The implementation currently parallelizes at the regionDir level.
58   */
59  @InterfaceAudience.Private
60  public class HFileCorruptionChecker {
61    private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
62  
63    final Configuration conf;
64    final FileSystem fs;
65    final CacheConfig cacheConf;
66    final ExecutorService executor;
67    final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
68    final Set<Path> failures = new ConcurrentSkipListSet<Path>();
69    final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
70    final Set<Path> missing = new ConcurrentSkipListSet<Path>();
71    final boolean inQuarantineMode;
72    final AtomicInteger hfilesChecked = new AtomicInteger();
73  
74    public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
75        boolean quarantine) throws IOException {
76      this.conf = conf;
77      this.fs = FileSystem.get(conf);
78      this.cacheConf = new CacheConfig(conf);
79      this.executor = executor;
80      this.inQuarantineMode = quarantine;
81    }
82  
83    /**
84     * Checks a path to see if it is a valid hfile.
85     *
86     * @param p
87     *          full Path to an HFile
88     * @throws IOException
89     *           This is a connectivity related exception
90     */
91    protected void checkHFile(Path p) throws IOException {
92      HFile.Reader r = null;
93      try {
94        r = HFile.createReader(fs, p, cacheConf, conf);
95      } catch (CorruptHFileException che) {
96        LOG.warn("Found corrupt HFile " + p, che);
97        corrupted.add(p);
98        if (inQuarantineMode) {
99          Path dest = createQuarantinePath(p);
100         LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
101         boolean success = fs.mkdirs(dest.getParent());
102         success = success ? fs.rename(p, dest): false;
103         if (!success) {
104           failures.add(p);
105         } else {
106           quarantined.add(dest);
107         }
108       }
109       return;
110     } catch (FileNotFoundException fnfe) {
111       LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
112       missing.add(p);
113     } finally {
114       hfilesChecked.addAndGet(1);
115       if (r != null) {
116         r.close(true);
117       }
118     }
119   }
120 
121   /**
122    * Given a path, generates a new path to where we move a corrupted hfile (bad
123    * trailer, no trailer).
124    *
125    * @param hFile
126    *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
127    *          /region/cf/file)
128    * @return path to where corrupted files are stored. This should be
129    *         HBASE_DIR/.corrupt/table/region/cf/file.
130    */
131   Path createQuarantinePath(Path hFile) throws IOException {
132     // extract the normal dirs structure
133     Path cfDir = hFile.getParent();
134     Path regionDir = cfDir.getParent();
135     Path tableDir = regionDir.getParent();
136 
137     // build up the corrupted dirs strcture
138     Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), conf.get(
139         "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
140     Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
141     Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
142     Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
143     Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
144     return corruptHfile;
145   }
146 
147   /**
148    * Check all files in a column family dir.
149    *
150    * @param cfDir
151    *          column family directory
152    * @throws IOException
153    */
154   protected void checkColFamDir(Path cfDir) throws IOException {
155     FileStatus[] hfs = null;
156     try {
157       hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
158     } catch (FileNotFoundException fnfe) {
159       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
160       LOG.warn("Colfam Directory " + cfDir +
161           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
162       missing.add(cfDir);
163       return;
164     }
165 
166     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
167     if (hfs.length == 0 && !fs.exists(cfDir)) {
168       LOG.warn("Colfam Directory " + cfDir +
169           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
170       missing.add(cfDir);
171       return;
172     }
173     for (FileStatus hfFs : hfs) {
174       Path hf = hfFs.getPath();
175       checkHFile(hf);
176     }
177   }
178 
179   /**
180    * Check all column families in a region dir.
181    *
182    * @param regionDir
183    *          region directory
184    * @throws IOException
185    */
186   protected void checkRegionDir(Path regionDir) throws IOException {
187     FileStatus[] cfs = null;
188     try {
189       cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
190     } catch (FileNotFoundException fnfe) {
191       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
192       LOG.warn("Region Directory " + regionDir +
193           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
194       missing.add(regionDir);
195       return;
196     }
197 
198     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
199     if (cfs.length == 0 && !fs.exists(regionDir)) {
200       LOG.warn("Region Directory " + regionDir +
201           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
202       missing.add(regionDir);
203       return;
204     }
205 
206     for (FileStatus cfFs : cfs) {
207       Path cfDir = cfFs.getPath();
208       checkColFamDir(cfDir);
209     }
210   }
211 
212   /**
213    * Check all the regiondirs in the specified tableDir
214    *
215    * @param tableDir
216    *          path to a table
217    * @throws IOException
218    */
219   void checkTableDir(Path tableDir) throws IOException {
220     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
221     if (rds.length == 0 && !fs.exists(tableDir)) {
222       // interestingly listStatus does not throw an exception if the path does not exist.
223       LOG.warn("Table Directory " + tableDir +
224           " does not exist.  Likely due to concurrent delete. Skipping.");
225       missing.add(tableDir);
226       return;
227     }
228 
229     // Parallelize check at the region dir level
230     List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
231     List<Future<Void>> rdFutures;
232 
233     for (FileStatus rdFs : rds) {
234       Path rdDir = rdFs.getPath();
235       RegionDirChecker work = new RegionDirChecker(rdDir);
236       rdcs.add(work);
237     }
238 
239     // Submit and wait for completion
240     try {
241       rdFutures = executor.invokeAll(rdcs);
242     } catch (InterruptedException ie) {
243       Thread.currentThread().interrupt();
244       LOG.warn("Region dirs checking interrupted!", ie);
245       return;
246     }
247 
248     for (int i = 0; i < rdFutures.size(); i++) {
249       Future<Void> f = rdFutures.get(i);
250       try {
251         f.get();
252       } catch (ExecutionException e) {
253         LOG.warn("Failed to quaratine an HFile in regiondir "
254             + rdcs.get(i).regionDir, e.getCause());
255         // rethrow IOExceptions
256         if (e.getCause() instanceof IOException) {
257           throw (IOException) e.getCause();
258         }
259 
260         // rethrow RuntimeExceptions
261         if (e.getCause() instanceof RuntimeException) {
262           throw (RuntimeException) e.getCause();
263         }
264 
265         // this should never happen
266         LOG.error("Unexpected exception encountered", e);
267         return; // bailing out.
268       } catch (InterruptedException ie) {
269         Thread.currentThread().interrupt();
270         LOG.warn("Region dirs check interrupted!", ie);
271         // bailing out
272         return;
273       }
274     }
275   }
276 
277   /**
278    * An individual work item for parallelized regiondir processing. This is
279    * intentionally an inner class so it can use the shared error sets and fs.
280    */
281   private class RegionDirChecker implements Callable<Void> {
282     final Path regionDir;
283 
284     RegionDirChecker(Path regionDir) {
285       this.regionDir = regionDir;
286     }
287 
288     @Override
289     public Void call() throws IOException {
290       checkRegionDir(regionDir);
291       return null;
292     }
293   }
294 
295   /**
296    * Check the specified table dirs for bad hfiles.
297    */
298   public void checkTables(Collection<Path> tables) throws IOException {
299     for (Path t : tables) {
300       checkTableDir(t);
301     }
302   }
303 
304   /**
305    * @return the set of check failure file paths after checkTables is called.
306    */
307   public Collection<Path> getFailures() {
308     return new HashSet<Path>(failures);
309   }
310 
311   /**
312    * @return the set of corrupted file paths after checkTables is called.
313    */
314   public Collection<Path> getCorrupted() {
315     return new HashSet<Path>(corrupted);
316   }
317 
318   /**
319    * @return number of hfiles checked in the last HfileCorruptionChecker run
320    */
321   public int getHFilesChecked() {
322     return hfilesChecked.get();
323   }
324 
325   /**
326    * @return the set of successfully quarantined paths after checkTables is called.
327    */
328   public Collection<Path> getQuarantined() {
329     return new HashSet<Path>(quarantined);
330   }
331 
332   /**
333    * @return the set of paths that were missing.  Likely due to deletion/moves from
334    *  compaction or flushes.
335    */
336   public Collection<Path> getMissing() {
337     return new HashSet<Path>(missing);
338   }
339 
340   /**
341    * Print a human readable summary of hfile quarantining operations.
342    * @param out
343    */
344   public void report(ErrorReporter out) {
345     out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
346     out.print("  HFiles corrupted:                  " + corrupted.size());
347     if (inQuarantineMode) {
348       out.print("    HFiles successfully quarantined: " + quarantined.size());
349       for (Path sq : quarantined) {
350         out.print("      " + sq);
351       }
352       out.print("    HFiles failed quarantine:        " + failures.size());
353       for (Path fq : failures) {
354         out.print("      " + fq);
355       }
356     }
357     out.print("    HFiles moved while checking:     " + missing.size());
358     for (Path mq : missing) {
359       out.print("      " + mq);
360     }
361 
362     String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
363     String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
364         : "CORRUPTED";
365 
366     if (inQuarantineMode) {
367       out.print("Summary: " + initialState + " => " + fixedState);
368     } else {
369       out.print("Summary: " + initialState);
370     }
371   }
372 }