001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.hbck;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ConcurrentSkipListSet;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Future;
032import java.util.concurrent.atomic.AtomicInteger;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.io.hfile.CacheConfig;
040import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.mob.MobUtils;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.FSUtils;
045import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
046import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
047import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
048import org.apache.hadoop.hbase.util.HbckErrorReporter;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053
054/**
055 * This class marches through all of the region's hfiles and verifies that
056 * they are all valid files. One just needs to instantiate the class, use
057 * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
058 * quarantined files if in quarantining mode)
059 *
060 * The implementation currently parallelizes at the regionDir level.
061 */
062@InterfaceAudience.Private
063public class HFileCorruptionChecker {
064  private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class);
065
066  final Configuration conf;
067  final FileSystem fs;
068  final CacheConfig cacheConf;
069  final ExecutorService executor;
070  final Set<Path> corrupted = new ConcurrentSkipListSet<>();
071  final Set<Path> failures = new ConcurrentSkipListSet<>();
072  final Set<Path> quarantined = new ConcurrentSkipListSet<>();
073  final Set<Path> missing = new ConcurrentSkipListSet<>();
074  final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>();
075  final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>();
076  final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>();
077  final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>();
078  final boolean inQuarantineMode;
079  final AtomicInteger hfilesChecked = new AtomicInteger();
080  final AtomicInteger mobFilesChecked = new AtomicInteger();
081
082  public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
083      boolean quarantine) throws IOException {
084    this.conf = conf;
085    this.fs = FileSystem.get(conf);
086    this.cacheConf = CacheConfig.DISABLED;
087    this.executor = executor;
088    this.inQuarantineMode = quarantine;
089  }
090
091  /**
092   * Checks a path to see if it is a valid hfile.
093   *
094   * @param p
095   *          full Path to an HFile
096   * @throws IOException
097   *           This is a connectivity related exception
098   */
099  protected void checkHFile(Path p) throws IOException {
100    HFile.Reader r = null;
101    try {
102      r = HFile.createReader(fs, p, cacheConf, true, conf);
103    } catch (CorruptHFileException che) {
104      LOG.warn("Found corrupt HFile " + p, che);
105      corrupted.add(p);
106      if (inQuarantineMode) {
107        Path dest = createQuarantinePath(p);
108        LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
109        boolean success = fs.mkdirs(dest.getParent());
110        success = success ? fs.rename(p, dest): false;
111        if (!success) {
112          failures.add(p);
113        } else {
114          quarantined.add(dest);
115        }
116      }
117      return;
118    } catch (FileNotFoundException fnfe) {
119      LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
120      missing.add(p);
121    } finally {
122      hfilesChecked.addAndGet(1);
123      if (r != null) {
124        r.close(true);
125      }
126    }
127  }
128
129  /**
130   * Given a path, generates a new path to where we move a corrupted hfile (bad
131   * trailer, no trailer).
132   *
133   * @param hFile
134   *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
135   *          /region/cf/file)
136   * @return path to where corrupted files are stored. This should be
137   *         HBASE_DIR/.corrupt/table/region/cf/file.
138   */
139  Path createQuarantinePath(Path hFile) throws IOException {
140    // extract the normal dirs structure
141    Path cfDir = hFile.getParent();
142    Path regionDir = cfDir.getParent();
143    Path tableDir = regionDir.getParent();
144
145    // build up the corrupted dirs structure
146    Path corruptBaseDir = new Path(CommonFSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
147    if (conf.get("hbase.hfile.quarantine.dir") != null) {
148      LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir);
149    }
150    Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
151    Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
152    Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
153    Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
154    return corruptHfile;
155  }
156
157  /**
158   * Check all files in a column family dir.
159   *
160   * @param cfDir
161   *          column family directory
162   * @throws IOException
163   */
164  protected void checkColFamDir(Path cfDir) throws IOException {
165    FileStatus[] statuses = null;
166    try {
167      statuses = fs.listStatus(cfDir); // use same filter as scanner.
168    } catch (FileNotFoundException fnfe) {
169      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
170      LOG.warn("Colfam Directory " + cfDir +
171          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
172      missing.add(cfDir);
173      return;
174    }
175
176    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
177    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
178    if (hfs.isEmpty() && !fs.exists(cfDir)) {
179      LOG.warn("Colfam Directory " + cfDir +
180          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
181      missing.add(cfDir);
182      return;
183    }
184
185    LOG.info("Checking Column Family Directory {}. Number of entries = {}", cfDir, hfs.size());
186
187    for (FileStatus hfFs : hfs) {
188      Path hf = hfFs.getPath();
189      checkHFile(hf);
190    }
191  }
192
193  /**
194   * Check all files in a mob column family dir.
195   *
196   * @param cfDir
197   *          mob column family directory
198   * @throws IOException
199   */
200  protected void checkMobColFamDir(Path cfDir) throws IOException {
201    FileStatus[] statuses = null;
202    try {
203      statuses = fs.listStatus(cfDir); // use same filter as scanner.
204    } catch (FileNotFoundException fnfe) {
205      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
206      LOG.warn("Mob colfam Directory " + cfDir +
207          " does not exist.  Likely the table is deleted. Skipping.");
208      missedMobFiles.add(cfDir);
209      return;
210    }
211
212    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
213    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
214    if (hfs.isEmpty() && !fs.exists(cfDir)) {
215      LOG.warn("Mob colfam Directory " + cfDir +
216          " does not exist.  Likely the table is deleted. Skipping.");
217      missedMobFiles.add(cfDir);
218      return;
219    }
220
221    LOG.info("Checking MOB Column Family Directory {}. Number of entries = {}", cfDir, hfs.size());
222
223    for (FileStatus hfFs : hfs) {
224      Path hf = hfFs.getPath();
225      checkMobFile(hf);
226    }
227  }
228
229  /**
230   * Checks a path to see if it is a valid mob file.
231   *
232   * @param p
233   *          full Path to a mob file.
234   * @throws IOException
235   *           This is a connectivity related exception
236   */
237  protected void checkMobFile(Path p) throws IOException {
238    HFile.Reader r = null;
239    try {
240      r = HFile.createReader(fs, p, cacheConf, true, conf);
241    } catch (CorruptHFileException che) {
242      LOG.warn("Found corrupt mob file " + p, che);
243      corruptedMobFiles.add(p);
244      if (inQuarantineMode) {
245        Path dest = createQuarantinePath(p);
246        LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
247        boolean success = fs.mkdirs(dest.getParent());
248        success = success ? fs.rename(p, dest): false;
249        if (!success) {
250          failureMobFiles.add(p);
251        } else {
252          quarantinedMobFiles.add(dest);
253        }
254      }
255      return;
256    } catch (FileNotFoundException fnfe) {
257      LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
258      missedMobFiles.add(p);
259    } finally {
260      mobFilesChecked.addAndGet(1);
261      if (r != null) {
262        r.close(true);
263      }
264    }
265  }
266
267  /**
268   * Checks all the mob files of a table.
269   * @param regionDir The mob region directory
270   * @throws IOException
271   */
272  private void checkMobRegionDir(Path regionDir) throws IOException {
273    if (!fs.exists(regionDir)) {
274      return;
275    }
276    FileStatus[] hfs = null;
277    try {
278      hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
279    } catch (FileNotFoundException fnfe) {
280      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
281      LOG.warn("Mob directory " + regionDir
282        + " does not exist.  Likely the table is deleted. Skipping.");
283      missedMobFiles.add(regionDir);
284      return;
285    }
286
287    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
288    if (hfs.length == 0 && !fs.exists(regionDir)) {
289      LOG.warn("Mob directory " + regionDir
290        + " does not exist.  Likely the table is deleted. Skipping.");
291      missedMobFiles.add(regionDir);
292      return;
293    }
294
295    LOG.info("Checking MOB Region Directory {}. Number of entries = {}", regionDir, hfs.length);
296
297    for (FileStatus hfFs : hfs) {
298      Path hf = hfFs.getPath();
299      checkMobColFamDir(hf);
300    }
301  }
302
303  /**
304   * Check all column families in a region dir.
305   *
306   * @param regionDir
307   *          region directory
308   * @throws IOException
309   */
310  protected void checkRegionDir(Path regionDir) throws IOException {
311    FileStatus[] statuses = null;
312    try {
313      statuses = fs.listStatus(regionDir);
314    } catch (FileNotFoundException fnfe) {
315      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
316      LOG.warn("Region Directory " + regionDir +
317          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
318      missing.add(regionDir);
319      return;
320    }
321
322    List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
323    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
324    if (cfs.isEmpty() && !fs.exists(regionDir)) {
325      LOG.warn("Region Directory " + regionDir +
326          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
327      missing.add(regionDir);
328      return;
329    }
330
331    LOG.info("Checking Region Directory {}. Number of entries = {}", regionDir, cfs.size());
332
333    for (FileStatus cfFs : cfs) {
334      Path cfDir = cfFs.getPath();
335      checkColFamDir(cfDir);
336    }
337  }
338
339  /**
340   * Check all the regiondirs in the specified tableDir
341   *
342   * @param tableDir
343   *          path to a table
344   * @throws IOException
345   */
346  void checkTableDir(Path tableDir) throws IOException {
347    List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
348    if (rds == null) {
349      if (!fs.exists(tableDir)) {
350        LOG.warn("Table Directory " + tableDir +
351            " does not exist.  Likely due to concurrent delete. Skipping.");
352        missing.add(tableDir);
353      }
354      return;
355    }
356
357    LOG.info("Checking Table Directory {}. Number of entries (including mob) = {}", tableDir, rds.size() + 1);
358
359    // Parallelize check at the region dir level
360    List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1);
361    List<Future<Void>> rdFutures;
362
363    for (FileStatus rdFs : rds) {
364      Path rdDir = rdFs.getPath();
365      RegionDirChecker work = new RegionDirChecker(rdDir);
366      rdcs.add(work);
367    }
368
369    // add mob region
370    rdcs.add(createMobRegionDirChecker(tableDir));
371    // Submit and wait for completion
372    try {
373      rdFutures = executor.invokeAll(rdcs);
374    } catch (InterruptedException ie) {
375      Thread.currentThread().interrupt();
376      LOG.warn("Region dirs checking interrupted!", ie);
377      return;
378    }
379
380    for (int i = 0; i < rdFutures.size(); i++) {
381      Future<Void> f = rdFutures.get(i);
382      try {
383        f.get();
384      } catch (ExecutionException e) {
385        LOG.warn("Failed to quarantine an HFile in regiondir "
386            + rdcs.get(i).regionDir, e.getCause());
387        // rethrow IOExceptions
388        if (e.getCause() instanceof IOException) {
389          throw (IOException) e.getCause();
390        }
391
392        // rethrow RuntimeExceptions
393        if (e.getCause() instanceof RuntimeException) {
394          throw (RuntimeException) e.getCause();
395        }
396
397        // this should never happen
398        LOG.error("Unexpected exception encountered", e);
399        return; // bailing out.
400      } catch (InterruptedException ie) {
401        Thread.currentThread().interrupt();
402        LOG.warn("Region dirs check interrupted!", ie);
403        // bailing out
404        return;
405      }
406    }
407  }
408
409  /**
410   * An individual work item for parallelized regiondir processing. This is
411   * intentionally an inner class so it can use the shared error sets and fs.
412   */
413  private class RegionDirChecker implements Callable<Void> {
414    final Path regionDir;
415
416    RegionDirChecker(Path regionDir) {
417      this.regionDir = regionDir;
418    }
419
420    @Override
421    public Void call() throws IOException {
422      checkRegionDir(regionDir);
423      return null;
424    }
425  }
426
427  /**
428   * An individual work item for parallelized mob dir processing. This is
429   * intentionally an inner class so it can use the shared error sets and fs.
430   */
431  private class MobRegionDirChecker extends RegionDirChecker {
432
433    MobRegionDirChecker(Path regionDir) {
434      super(regionDir);
435    }
436
437    @Override
438    public Void call() throws IOException {
439      checkMobRegionDir(regionDir);
440      return null;
441    }
442  }
443
444  /**
445   * Creates an instance of MobRegionDirChecker.
446   * @param tableDir The current table directory.
447   * @return An instance of MobRegionDirChecker.
448   */
449  private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
450    TableName tableName = CommonFSUtils.getTableName(tableDir);
451    Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
452    return new MobRegionDirChecker(mobDir);
453  }
454
455  /**
456   * Check the specified table dirs for bad hfiles.
457   */
458  public void checkTables(Collection<Path> tables) throws IOException {
459    for (Path t : tables) {
460      checkTableDir(t);
461    }
462  }
463
464  /**
465   * @return the set of check failure file paths after checkTables is called.
466   */
467  public Collection<Path> getFailures() {
468    return new HashSet<>(failures);
469  }
470
471  /**
472   * @return the set of corrupted file paths after checkTables is called.
473   */
474  public Collection<Path> getCorrupted() {
475    return new HashSet<>(corrupted);
476  }
477
478  /**
479   * @return number of hfiles checked in the last HfileCorruptionChecker run
480   */
481  public int getHFilesChecked() {
482    return hfilesChecked.get();
483  }
484
485  /**
486   * @return the set of successfully quarantined paths after checkTables is called.
487   */
488  public Collection<Path> getQuarantined() {
489    return new HashSet<>(quarantined);
490  }
491
492  /**
493   * @return the set of paths that were missing.  Likely due to deletion/moves from
494   *  compaction or flushes.
495   */
496  public Collection<Path> getMissing() {
497    return new HashSet<>(missing);
498  }
499
500  /**
501   * @return the set of check failure mob file paths after checkTables is called.
502   */
503  public Collection<Path> getFailureMobFiles() {
504    return new HashSet<>(failureMobFiles);
505  }
506
507  /**
508   * @return the set of corrupted mob file paths after checkTables is called.
509   */
510  public Collection<Path> getCorruptedMobFiles() {
511    return new HashSet<>(corruptedMobFiles);
512  }
513
514  /**
515   * @return number of mob files checked in the last HfileCorruptionChecker run
516   */
517  public int getMobFilesChecked() {
518    return mobFilesChecked.get();
519  }
520
521  /**
522   * @return the set of successfully quarantined paths after checkTables is called.
523   */
524  public Collection<Path> getQuarantinedMobFiles() {
525    return new HashSet<>(quarantinedMobFiles);
526  }
527
528  /**
529   * @return the set of paths that were missing.  Likely due to table deletion or
530   *  deletion/moves from compaction.
531   */
532  public Collection<Path> getMissedMobFiles() {
533    return new HashSet<>(missedMobFiles);
534  }
535
536  /**
537   * Print a human readable summary of hfile quarantining operations.
538   * @param out
539   */
540  public void report(HbckErrorReporter out) {
541    out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
542    out.print("  HFiles corrupted:                  " + corrupted.size());
543    if (inQuarantineMode) {
544      out.print("    HFiles successfully quarantined: " + quarantined.size());
545      for (Path sq : quarantined) {
546        out.print("      " + sq);
547      }
548      out.print("    HFiles failed quarantine:        " + failures.size());
549      for (Path fq : failures) {
550        out.print("      " + fq);
551      }
552    }
553    out.print("    HFiles moved while checking:     " + missing.size());
554    for (Path mq : missing) {
555      out.print("      " + mq);
556    }
557
558    String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED";
559    String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
560        : "CORRUPTED";
561
562    // print mob-related report
563    out.print("Checked " + mobFilesChecked.get() + " Mob files for corruption");
564    out.print("  Mob files corrupted:                  " + corruptedMobFiles.size());
565    if (inQuarantineMode) {
566      out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
567      for (Path sq : quarantinedMobFiles) {
568        out.print("      " + sq);
569      }
570      out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
571      for (Path fq : failureMobFiles) {
572        out.print("      " + fq);
573      }
574    }
575    out.print("    Mob files moved while checking:     " + missedMobFiles.size());
576    for (Path mq : missedMobFiles) {
577      out.print("      " + mq);
578    }
579    String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED";
580    String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
581        : "CORRUPTED";
582
583    if (inQuarantineMode) {
584      out.print("Summary: " + initialState + " => " + fixedState);
585      out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
586    } else {
587      out.print("Summary: " + initialState);
588      out.print("Mob summary: " + initialMobState);
589    }
590  }
591}