001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.hbck;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ConcurrentSkipListSet;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Future;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import org.apache.hadoop.hbase.util.HbckErrorReporter;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
046import org.apache.hadoop.hbase.io.hfile.HFile;
047import org.apache.hadoop.hbase.mob.MobUtils;
048import org.apache.hadoop.hbase.util.FSUtils;
049import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
050import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
051import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
052
053/**
054 * This class marches through all of the region's hfiles and verifies that
055 * they are all valid files. One just needs to instantiate the class, use
056 * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
057 * quarantined files if in quarantining mode)
058 *
059 * The implementation currently parallelizes at the regionDir level.
060 */
061@InterfaceAudience.Private
062public class HFileCorruptionChecker {
063  private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class);
064
065  final Configuration conf;
066  final FileSystem fs;
067  final CacheConfig cacheConf;
068  final ExecutorService executor;
069  final Set<Path> corrupted = new ConcurrentSkipListSet<>();
070  final Set<Path> failures = new ConcurrentSkipListSet<>();
071  final Set<Path> quarantined = new ConcurrentSkipListSet<>();
072  final Set<Path> missing = new ConcurrentSkipListSet<>();
073  final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>();
074  final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>();
075  final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>();
076  final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>();
077  final boolean inQuarantineMode;
078  final AtomicInteger hfilesChecked = new AtomicInteger();
079  final AtomicInteger mobFilesChecked = new AtomicInteger();
080
081  public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
082      boolean quarantine) throws IOException {
083    this.conf = conf;
084    this.fs = FileSystem.get(conf);
085    this.cacheConf = CacheConfig.DISABLED;
086    this.executor = executor;
087    this.inQuarantineMode = quarantine;
088  }
089
090  /**
091   * Checks a path to see if it is a valid hfile.
092   *
093   * @param p
094   *          full Path to an HFile
095   * @throws IOException
096   *           This is a connectivity related exception
097   */
098  protected void checkHFile(Path p) throws IOException {
099    HFile.Reader r = null;
100    try {
101      r = HFile.createReader(fs, p, cacheConf, true, conf);
102    } catch (CorruptHFileException che) {
103      LOG.warn("Found corrupt HFile " + p, che);
104      corrupted.add(p);
105      if (inQuarantineMode) {
106        Path dest = createQuarantinePath(p);
107        LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
108        boolean success = fs.mkdirs(dest.getParent());
109        success = success ? fs.rename(p, dest): false;
110        if (!success) {
111          failures.add(p);
112        } else {
113          quarantined.add(dest);
114        }
115      }
116      return;
117    } catch (FileNotFoundException fnfe) {
118      LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
119      missing.add(p);
120    } finally {
121      hfilesChecked.addAndGet(1);
122      if (r != null) {
123        r.close(true);
124      }
125    }
126  }
127
128  /**
129   * Given a path, generates a new path to where we move a corrupted hfile (bad
130   * trailer, no trailer).
131   *
132   * @param hFile
133   *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
134   *          /region/cf/file)
135   * @return path to where corrupted files are stored. This should be
136   *         HBASE_DIR/.corrupt/table/region/cf/file.
137   */
138  Path createQuarantinePath(Path hFile) throws IOException {
139    // extract the normal dirs structure
140    Path cfDir = hFile.getParent();
141    Path regionDir = cfDir.getParent();
142    Path tableDir = regionDir.getParent();
143
144    // build up the corrupted dirs structure
145    Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
146    if (conf.get("hbase.hfile.quarantine.dir") != null) {
147      LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir);
148    }
149    Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
150    Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
151    Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
152    Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
153    return corruptHfile;
154  }
155
156  /**
157   * Check all files in a column family dir.
158   *
159   * @param cfDir
160   *          column family directory
161   * @throws IOException
162   */
163  protected void checkColFamDir(Path cfDir) throws IOException {
164    FileStatus[] statuses = null;
165    try {
166      statuses = fs.listStatus(cfDir); // use same filter as scanner.
167    } catch (FileNotFoundException fnfe) {
168      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
169      LOG.warn("Colfam Directory " + cfDir +
170          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
171      missing.add(cfDir);
172      return;
173    }
174
175    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
176    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
177    if (hfs.isEmpty() && !fs.exists(cfDir)) {
178      LOG.warn("Colfam Directory " + cfDir +
179          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
180      missing.add(cfDir);
181      return;
182    }
183    for (FileStatus hfFs : hfs) {
184      Path hf = hfFs.getPath();
185      checkHFile(hf);
186    }
187  }
188
189  /**
190   * Check all files in a mob column family dir.
191   *
192   * @param cfDir
193   *          mob column family directory
194   * @throws IOException
195   */
196  protected void checkMobColFamDir(Path cfDir) throws IOException {
197    FileStatus[] statuses = null;
198    try {
199      statuses = fs.listStatus(cfDir); // use same filter as scanner.
200    } catch (FileNotFoundException fnfe) {
201      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
202      LOG.warn("Mob colfam Directory " + cfDir +
203          " does not exist.  Likely the table is deleted. Skipping.");
204      missedMobFiles.add(cfDir);
205      return;
206    }
207
208    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
209    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
210    if (hfs.isEmpty() && !fs.exists(cfDir)) {
211      LOG.warn("Mob colfam Directory " + cfDir +
212          " does not exist.  Likely the table is deleted. Skipping.");
213      missedMobFiles.add(cfDir);
214      return;
215    }
216    for (FileStatus hfFs : hfs) {
217      Path hf = hfFs.getPath();
218      checkMobFile(hf);
219    }
220  }
221
222  /**
223   * Checks a path to see if it is a valid mob file.
224   *
225   * @param p
226   *          full Path to a mob file.
227   * @throws IOException
228   *           This is a connectivity related exception
229   */
230  protected void checkMobFile(Path p) throws IOException {
231    HFile.Reader r = null;
232    try {
233      r = HFile.createReader(fs, p, cacheConf, true, conf);
234    } catch (CorruptHFileException che) {
235      LOG.warn("Found corrupt mob file " + p, che);
236      corruptedMobFiles.add(p);
237      if (inQuarantineMode) {
238        Path dest = createQuarantinePath(p);
239        LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
240        boolean success = fs.mkdirs(dest.getParent());
241        success = success ? fs.rename(p, dest): false;
242        if (!success) {
243          failureMobFiles.add(p);
244        } else {
245          quarantinedMobFiles.add(dest);
246        }
247      }
248      return;
249    } catch (FileNotFoundException fnfe) {
250      LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
251      missedMobFiles.add(p);
252    } finally {
253      mobFilesChecked.addAndGet(1);
254      if (r != null) {
255        r.close(true);
256      }
257    }
258  }
259
260  /**
261   * Checks all the mob files of a table.
262   * @param regionDir The mob region directory
263   * @throws IOException
264   */
265  private void checkMobRegionDir(Path regionDir) throws IOException {
266    if (!fs.exists(regionDir)) {
267      return;
268    }
269    FileStatus[] hfs = null;
270    try {
271      hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
272    } catch (FileNotFoundException fnfe) {
273      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
274      LOG.warn("Mob directory " + regionDir
275        + " does not exist.  Likely the table is deleted. Skipping.");
276      missedMobFiles.add(regionDir);
277      return;
278    }
279
280    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
281    if (hfs.length == 0 && !fs.exists(regionDir)) {
282      LOG.warn("Mob directory " + regionDir
283        + " does not exist.  Likely the table is deleted. Skipping.");
284      missedMobFiles.add(regionDir);
285      return;
286    }
287    for (FileStatus hfFs : hfs) {
288      Path hf = hfFs.getPath();
289      checkMobColFamDir(hf);
290    }
291  }
292
293  /**
294   * Check all column families in a region dir.
295   *
296   * @param regionDir
297   *          region directory
298   * @throws IOException
299   */
300  protected void checkRegionDir(Path regionDir) throws IOException {
301    FileStatus[] statuses = null;
302    try {
303      statuses = fs.listStatus(regionDir);
304    } catch (FileNotFoundException fnfe) {
305      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
306      LOG.warn("Region Directory " + regionDir +
307          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
308      missing.add(regionDir);
309      return;
310    }
311
312    List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
313    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
314    if (cfs.isEmpty() && !fs.exists(regionDir)) {
315      LOG.warn("Region Directory " + regionDir +
316          " does not exist.  Likely due to concurrent split/compaction. Skipping.");
317      missing.add(regionDir);
318      return;
319    }
320
321    for (FileStatus cfFs : cfs) {
322      Path cfDir = cfFs.getPath();
323      checkColFamDir(cfDir);
324    }
325  }
326
327  /**
328   * Check all the regiondirs in the specified tableDir
329   *
330   * @param tableDir
331   *          path to a table
332   * @throws IOException
333   */
334  void checkTableDir(Path tableDir) throws IOException {
335    List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
336    if (rds == null) {
337      if (!fs.exists(tableDir)) {
338        LOG.warn("Table Directory " + tableDir +
339            " does not exist.  Likely due to concurrent delete. Skipping.");
340        missing.add(tableDir);
341      }
342      return;
343    }
344
345    // Parallelize check at the region dir level
346    List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1);
347    List<Future<Void>> rdFutures;
348
349    for (FileStatus rdFs : rds) {
350      Path rdDir = rdFs.getPath();
351      RegionDirChecker work = new RegionDirChecker(rdDir);
352      rdcs.add(work);
353    }
354
355    // add mob region
356    rdcs.add(createMobRegionDirChecker(tableDir));
357    // Submit and wait for completion
358    try {
359      rdFutures = executor.invokeAll(rdcs);
360    } catch (InterruptedException ie) {
361      Thread.currentThread().interrupt();
362      LOG.warn("Region dirs checking interrupted!", ie);
363      return;
364    }
365
366    for (int i = 0; i < rdFutures.size(); i++) {
367      Future<Void> f = rdFutures.get(i);
368      try {
369        f.get();
370      } catch (ExecutionException e) {
371        LOG.warn("Failed to quarantine an HFile in regiondir "
372            + rdcs.get(i).regionDir, e.getCause());
373        // rethrow IOExceptions
374        if (e.getCause() instanceof IOException) {
375          throw (IOException) e.getCause();
376        }
377
378        // rethrow RuntimeExceptions
379        if (e.getCause() instanceof RuntimeException) {
380          throw (RuntimeException) e.getCause();
381        }
382
383        // this should never happen
384        LOG.error("Unexpected exception encountered", e);
385        return; // bailing out.
386      } catch (InterruptedException ie) {
387        Thread.currentThread().interrupt();
388        LOG.warn("Region dirs check interrupted!", ie);
389        // bailing out
390        return;
391      }
392    }
393  }
394
395  /**
396   * An individual work item for parallelized regiondir processing. This is
397   * intentionally an inner class so it can use the shared error sets and fs.
398   */
399  private class RegionDirChecker implements Callable<Void> {
400    final Path regionDir;
401
402    RegionDirChecker(Path regionDir) {
403      this.regionDir = regionDir;
404    }
405
406    @Override
407    public Void call() throws IOException {
408      checkRegionDir(regionDir);
409      return null;
410    }
411  }
412
413  /**
414   * An individual work item for parallelized mob dir processing. This is
415   * intentionally an inner class so it can use the shared error sets and fs.
416   */
417  private class MobRegionDirChecker extends RegionDirChecker {
418
419    MobRegionDirChecker(Path regionDir) {
420      super(regionDir);
421    }
422
423    @Override
424    public Void call() throws IOException {
425      checkMobRegionDir(regionDir);
426      return null;
427    }
428  }
429
430  /**
431   * Creates an instance of MobRegionDirChecker.
432   * @param tableDir The current table directory.
433   * @return An instance of MobRegionDirChecker.
434   */
435  private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
436    TableName tableName = FSUtils.getTableName(tableDir);
437    Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
438    return new MobRegionDirChecker(mobDir);
439  }
440
441  /**
442   * Check the specified table dirs for bad hfiles.
443   */
444  public void checkTables(Collection<Path> tables) throws IOException {
445    for (Path t : tables) {
446      checkTableDir(t);
447    }
448  }
449
450  /**
451   * @return the set of check failure file paths after checkTables is called.
452   */
453  public Collection<Path> getFailures() {
454    return new HashSet<>(failures);
455  }
456
457  /**
458   * @return the set of corrupted file paths after checkTables is called.
459   */
460  public Collection<Path> getCorrupted() {
461    return new HashSet<>(corrupted);
462  }
463
464  /**
465   * @return number of hfiles checked in the last HfileCorruptionChecker run
466   */
467  public int getHFilesChecked() {
468    return hfilesChecked.get();
469  }
470
471  /**
472   * @return the set of successfully quarantined paths after checkTables is called.
473   */
474  public Collection<Path> getQuarantined() {
475    return new HashSet<>(quarantined);
476  }
477
478  /**
479   * @return the set of paths that were missing.  Likely due to deletion/moves from
480   *  compaction or flushes.
481   */
482  public Collection<Path> getMissing() {
483    return new HashSet<>(missing);
484  }
485
486  /**
487   * @return the set of check failure mob file paths after checkTables is called.
488   */
489  public Collection<Path> getFailureMobFiles() {
490    return new HashSet<>(failureMobFiles);
491  }
492
493  /**
494   * @return the set of corrupted mob file paths after checkTables is called.
495   */
496  public Collection<Path> getCorruptedMobFiles() {
497    return new HashSet<>(corruptedMobFiles);
498  }
499
500  /**
501   * @return number of mob files checked in the last HfileCorruptionChecker run
502   */
503  public int getMobFilesChecked() {
504    return mobFilesChecked.get();
505  }
506
507  /**
508   * @return the set of successfully quarantined paths after checkTables is called.
509   */
510  public Collection<Path> getQuarantinedMobFiles() {
511    return new HashSet<>(quarantinedMobFiles);
512  }
513
514  /**
515   * @return the set of paths that were missing.  Likely due to table deletion or
516   *  deletion/moves from compaction.
517   */
518  public Collection<Path> getMissedMobFiles() {
519    return new HashSet<>(missedMobFiles);
520  }
521
522  /**
523   * Print a human readable summary of hfile quarantining operations.
524   * @param out
525   */
526  public void report(HbckErrorReporter out) {
527    out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
528    out.print("  HFiles corrupted:                  " + corrupted.size());
529    if (inQuarantineMode) {
530      out.print("    HFiles successfully quarantined: " + quarantined.size());
531      for (Path sq : quarantined) {
532        out.print("      " + sq);
533      }
534      out.print("    HFiles failed quarantine:        " + failures.size());
535      for (Path fq : failures) {
536        out.print("      " + fq);
537      }
538    }
539    out.print("    HFiles moved while checking:     " + missing.size());
540    for (Path mq : missing) {
541      out.print("      " + mq);
542    }
543
544    String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED";
545    String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
546        : "CORRUPTED";
547
548    // print mob-related report
549    if (inQuarantineMode) {
550      out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
551      for (Path sq : quarantinedMobFiles) {
552        out.print("      " + sq);
553      }
554      out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
555      for (Path fq : failureMobFiles) {
556        out.print("      " + fq);
557      }
558    }
559    out.print("    Mob files moved while checking:     " + missedMobFiles.size());
560    for (Path mq : missedMobFiles) {
561      out.print("      " + mq);
562    }
563    String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED";
564    String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
565        : "CORRUPTED";
566
567    if (inQuarantineMode) {
568      out.print("Summary: " + initialState + " => " + fixedState);
569      out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
570    } else {
571      out.print("Summary: " + initialState);
572      out.print("Mob summary: " + initialMobState);
573    }
574  }
575}