001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.hbck;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ConcurrentSkipListSet;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Future;
032import java.util.concurrent.atomic.AtomicInteger;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.io.hfile.CacheConfig;
040import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.mob.MobUtils;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.FSUtils;
045import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
046import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
047import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
048import org.apache.hadoop.hbase.util.HbckErrorReporter;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * This class marches through all of the region's hfiles and verifies that they are all valid files.
055 * One just needs to instantiate the class, use checkTables(List<Path>) and then retrieve the
056 * corrupted hfiles (and quarantined files if in quarantining mode) The implementation currently
057 * parallelizes at the regionDir level.
058 */
059@InterfaceAudience.Private
060public class HFileCorruptionChecker {
061  private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class);
062
063  final Configuration conf;
064  final FileSystem fs;
065  final CacheConfig cacheConf;
066  final ExecutorService executor;
067  final Set<Path> corrupted = new ConcurrentSkipListSet<>();
068  final Set<Path> failures = new ConcurrentSkipListSet<>();
069  final Set<Path> quarantined = new ConcurrentSkipListSet<>();
070  final Set<Path> missing = new ConcurrentSkipListSet<>();
071  final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>();
072  final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>();
073  final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>();
074  final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>();
075  final boolean inQuarantineMode;
076  final AtomicInteger hfilesChecked = new AtomicInteger();
077  final AtomicInteger mobFilesChecked = new AtomicInteger();
078
079  public HFileCorruptionChecker(Configuration conf, ExecutorService executor, boolean quarantine)
080    throws IOException {
081    this.conf = conf;
082    this.fs = FileSystem.get(conf);
083    this.cacheConf = CacheConfig.DISABLED;
084    this.executor = executor;
085    this.inQuarantineMode = quarantine;
086  }
087
088  /**
089   * Checks a path to see if it is a valid hfile. n * full Path to an HFile n * This is a
090   * connectivity related exception
091   */
092  protected void checkHFile(Path p) throws IOException {
093    HFile.Reader r = null;
094    try {
095      r = HFile.createReader(fs, p, cacheConf, true, conf);
096    } catch (CorruptHFileException che) {
097      LOG.warn("Found corrupt HFile " + p, che);
098      corrupted.add(p);
099      if (inQuarantineMode) {
100        Path dest = createQuarantinePath(p);
101        LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
102        boolean success = fs.mkdirs(dest.getParent());
103        success = success ? fs.rename(p, dest) : false;
104        if (!success) {
105          failures.add(p);
106        } else {
107          quarantined.add(dest);
108        }
109      }
110      return;
111    } catch (FileNotFoundException fnfe) {
112      LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
113      missing.add(p);
114    } finally {
115      hfilesChecked.addAndGet(1);
116      if (r != null) {
117        r.close(true);
118      }
119    }
120  }
121
122  /**
123   * Given a path, generates a new path to where we move a corrupted hfile (bad trailer, no
124   * trailer). n * Path to a corrupt hfile (assumes that it is HBASE_DIR/ table /region/cf/file)
125   * @return path to where corrupted files are stored. This should be
126   *         HBASE_DIR/.corrupt/table/region/cf/file.
127   */
128  Path createQuarantinePath(Path hFile) throws IOException {
129    // extract the normal dirs structure
130    Path cfDir = hFile.getParent();
131    Path regionDir = cfDir.getParent();
132    Path tableDir = regionDir.getParent();
133
134    // build up the corrupted dirs structure
135    Path corruptBaseDir = new Path(CommonFSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
136    if (conf.get("hbase.hfile.quarantine.dir") != null) {
137      LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir);
138    }
139    Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
140    Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
141    Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
142    Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
143    return corruptHfile;
144  }
145
146  /**
147   * Check all files in a column family dir. n * column family directory n
148   */
149  protected void checkColFamDir(Path cfDir) throws IOException {
150    FileStatus[] statuses = null;
151    try {
152      statuses = fs.listStatus(cfDir); // use same filter as scanner.
153    } catch (FileNotFoundException fnfe) {
154      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
155      LOG.warn("Colfam Directory " + cfDir
156        + " does not exist.  Likely due to concurrent split/compaction. Skipping.");
157      missing.add(cfDir);
158      return;
159    }
160
161    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
162    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
163    if (hfs.isEmpty() && !fs.exists(cfDir)) {
164      LOG.warn("Colfam Directory " + cfDir
165        + " does not exist.  Likely due to concurrent split/compaction. Skipping.");
166      missing.add(cfDir);
167      return;
168    }
169
170    LOG.info("Checking Column Family Directory {}. Number of entries = {}", cfDir, hfs.size());
171
172    for (FileStatus hfFs : hfs) {
173      Path hf = hfFs.getPath();
174      checkHFile(hf);
175    }
176  }
177
178  /**
179   * Check all files in a mob column family dir. n * mob column family directory n
180   */
181  protected void checkMobColFamDir(Path cfDir) throws IOException {
182    FileStatus[] statuses = null;
183    try {
184      statuses = fs.listStatus(cfDir); // use same filter as scanner.
185    } catch (FileNotFoundException fnfe) {
186      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
187      LOG.warn("Mob colfam Directory " + cfDir
188        + " does not exist.  Likely the table is deleted. Skipping.");
189      missedMobFiles.add(cfDir);
190      return;
191    }
192
193    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
194    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
195    if (hfs.isEmpty() && !fs.exists(cfDir)) {
196      LOG.warn("Mob colfam Directory " + cfDir
197        + " does not exist.  Likely the table is deleted. Skipping.");
198      missedMobFiles.add(cfDir);
199      return;
200    }
201
202    LOG.info("Checking MOB Column Family Directory {}. Number of entries = {}", cfDir, hfs.size());
203
204    for (FileStatus hfFs : hfs) {
205      Path hf = hfFs.getPath();
206      checkMobFile(hf);
207    }
208  }
209
210  /**
211   * Checks a path to see if it is a valid mob file. n * full Path to a mob file. n * This is a
212   * connectivity related exception
213   */
214  protected void checkMobFile(Path p) throws IOException {
215    HFile.Reader r = null;
216    try {
217      r = HFile.createReader(fs, p, cacheConf, true, conf);
218    } catch (CorruptHFileException che) {
219      LOG.warn("Found corrupt mob file " + p, che);
220      corruptedMobFiles.add(p);
221      if (inQuarantineMode) {
222        Path dest = createQuarantinePath(p);
223        LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
224        boolean success = fs.mkdirs(dest.getParent());
225        success = success ? fs.rename(p, dest) : false;
226        if (!success) {
227          failureMobFiles.add(p);
228        } else {
229          quarantinedMobFiles.add(dest);
230        }
231      }
232      return;
233    } catch (FileNotFoundException fnfe) {
234      LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
235      missedMobFiles.add(p);
236    } finally {
237      mobFilesChecked.addAndGet(1);
238      if (r != null) {
239        r.close(true);
240      }
241    }
242  }
243
244  /**
245   * Checks all the mob files of a table.
246   * @param regionDir The mob region directory n
247   */
248  private void checkMobRegionDir(Path regionDir) throws IOException {
249    if (!fs.exists(regionDir)) {
250      return;
251    }
252    FileStatus[] hfs = null;
253    try {
254      hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
255    } catch (FileNotFoundException fnfe) {
256      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
257      LOG.warn(
258        "Mob directory " + regionDir + " does not exist.  Likely the table is deleted. Skipping.");
259      missedMobFiles.add(regionDir);
260      return;
261    }
262
263    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
264    if (hfs.length == 0 && !fs.exists(regionDir)) {
265      LOG.warn(
266        "Mob directory " + regionDir + " does not exist.  Likely the table is deleted. Skipping.");
267      missedMobFiles.add(regionDir);
268      return;
269    }
270
271    LOG.info("Checking MOB Region Directory {}. Number of entries = {}", regionDir, hfs.length);
272
273    for (FileStatus hfFs : hfs) {
274      Path hf = hfFs.getPath();
275      checkMobColFamDir(hf);
276    }
277  }
278
279  /**
280   * Check all column families in a region dir. n * region directory n
281   */
282  protected void checkRegionDir(Path regionDir) throws IOException {
283    FileStatus[] statuses = null;
284    try {
285      statuses = fs.listStatus(regionDir);
286    } catch (FileNotFoundException fnfe) {
287      // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
288      LOG.warn("Region Directory " + regionDir
289        + " does not exist.  Likely due to concurrent split/compaction. Skipping.");
290      missing.add(regionDir);
291      return;
292    }
293
294    List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
295    // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
296    if (cfs.isEmpty() && !fs.exists(regionDir)) {
297      LOG.warn("Region Directory " + regionDir
298        + " does not exist.  Likely due to concurrent split/compaction. Skipping.");
299      missing.add(regionDir);
300      return;
301    }
302
303    LOG.info("Checking Region Directory {}. Number of entries = {}", regionDir, cfs.size());
304
305    for (FileStatus cfFs : cfs) {
306      Path cfDir = cfFs.getPath();
307      checkColFamDir(cfDir);
308    }
309  }
310
311  /**
312   * Check all the regiondirs in the specified tableDir n * path to a table n
313   */
314  void checkTableDir(Path tableDir) throws IOException {
315    List<FileStatus> rds =
316      FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
317    if (rds == null) {
318      if (!fs.exists(tableDir)) {
319        LOG.warn("Table Directory " + tableDir
320          + " does not exist.  Likely due to concurrent delete. Skipping.");
321        missing.add(tableDir);
322      }
323      return;
324    }
325
326    LOG.info("Checking Table Directory {}. Number of entries (including mob) = {}", tableDir,
327      rds.size() + 1);
328
329    // Parallelize check at the region dir level
330    List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1);
331    List<Future<Void>> rdFutures;
332
333    for (FileStatus rdFs : rds) {
334      Path rdDir = rdFs.getPath();
335      RegionDirChecker work = new RegionDirChecker(rdDir);
336      rdcs.add(work);
337    }
338
339    // add mob region
340    rdcs.add(createMobRegionDirChecker(tableDir));
341    // Submit and wait for completion
342    try {
343      rdFutures = executor.invokeAll(rdcs);
344    } catch (InterruptedException ie) {
345      Thread.currentThread().interrupt();
346      LOG.warn("Region dirs checking interrupted!", ie);
347      return;
348    }
349
350    for (int i = 0; i < rdFutures.size(); i++) {
351      Future<Void> f = rdFutures.get(i);
352      try {
353        f.get();
354      } catch (ExecutionException e) {
355        LOG.warn("Failed to quarantine an HFile in regiondir " + rdcs.get(i).regionDir,
356          e.getCause());
357        // rethrow IOExceptions
358        if (e.getCause() instanceof IOException) {
359          throw (IOException) e.getCause();
360        }
361
362        // rethrow RuntimeExceptions
363        if (e.getCause() instanceof RuntimeException) {
364          throw (RuntimeException) e.getCause();
365        }
366
367        // this should never happen
368        LOG.error("Unexpected exception encountered", e);
369        return; // bailing out.
370      } catch (InterruptedException ie) {
371        Thread.currentThread().interrupt();
372        LOG.warn("Region dirs check interrupted!", ie);
373        // bailing out
374        return;
375      }
376    }
377  }
378
379  /**
380   * An individual work item for parallelized regiondir processing. This is intentionally an inner
381   * class so it can use the shared error sets and fs.
382   */
383  private class RegionDirChecker implements Callable<Void> {
384    final Path regionDir;
385
386    RegionDirChecker(Path regionDir) {
387      this.regionDir = regionDir;
388    }
389
390    @Override
391    public Void call() throws IOException {
392      checkRegionDir(regionDir);
393      return null;
394    }
395  }
396
397  /**
398   * An individual work item for parallelized mob dir processing. This is intentionally an inner
399   * class so it can use the shared error sets and fs.
400   */
401  private class MobRegionDirChecker extends RegionDirChecker {
402
403    MobRegionDirChecker(Path regionDir) {
404      super(regionDir);
405    }
406
407    @Override
408    public Void call() throws IOException {
409      checkMobRegionDir(regionDir);
410      return null;
411    }
412  }
413
414  /**
415   * Creates an instance of MobRegionDirChecker.
416   * @param tableDir The current table directory.
417   * @return An instance of MobRegionDirChecker.
418   */
419  private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
420    TableName tableName = CommonFSUtils.getTableName(tableDir);
421    Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
422    return new MobRegionDirChecker(mobDir);
423  }
424
425  /**
426   * Check the specified table dirs for bad hfiles.
427   */
428  public void checkTables(Collection<Path> tables) throws IOException {
429    for (Path t : tables) {
430      checkTableDir(t);
431    }
432  }
433
434  /**
435   * @return the set of check failure file paths after checkTables is called.
436   */
437  public Collection<Path> getFailures() {
438    return new HashSet<>(failures);
439  }
440
441  /**
442   * @return the set of corrupted file paths after checkTables is called.
443   */
444  public Collection<Path> getCorrupted() {
445    return new HashSet<>(corrupted);
446  }
447
448  /**
449   * @return number of hfiles checked in the last HfileCorruptionChecker run
450   */
451  public int getHFilesChecked() {
452    return hfilesChecked.get();
453  }
454
455  /**
456   * @return the set of successfully quarantined paths after checkTables is called.
457   */
458  public Collection<Path> getQuarantined() {
459    return new HashSet<>(quarantined);
460  }
461
462  /**
463   * @return the set of paths that were missing. Likely due to deletion/moves from compaction or
464   *         flushes.
465   */
466  public Collection<Path> getMissing() {
467    return new HashSet<>(missing);
468  }
469
470  /**
471   * @return the set of check failure mob file paths after checkTables is called.
472   */
473  public Collection<Path> getFailureMobFiles() {
474    return new HashSet<>(failureMobFiles);
475  }
476
477  /**
478   * @return the set of corrupted mob file paths after checkTables is called.
479   */
480  public Collection<Path> getCorruptedMobFiles() {
481    return new HashSet<>(corruptedMobFiles);
482  }
483
484  /**
485   * @return number of mob files checked in the last HfileCorruptionChecker run
486   */
487  public int getMobFilesChecked() {
488    return mobFilesChecked.get();
489  }
490
491  /**
492   * @return the set of successfully quarantined paths after checkTables is called.
493   */
494  public Collection<Path> getQuarantinedMobFiles() {
495    return new HashSet<>(quarantinedMobFiles);
496  }
497
498  /**
499   * @return the set of paths that were missing. Likely due to table deletion or deletion/moves from
500   *         compaction.
501   */
502  public Collection<Path> getMissedMobFiles() {
503    return new HashSet<>(missedMobFiles);
504  }
505
506  /**
507   * Print a human readable summary of hfile quarantining operations. n
508   */
509  public void report(HbckErrorReporter out) {
510    out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
511    out.print("  HFiles corrupted:                  " + corrupted.size());
512    if (inQuarantineMode) {
513      out.print("    HFiles successfully quarantined: " + quarantined.size());
514      for (Path sq : quarantined) {
515        out.print("      " + sq);
516      }
517      out.print("    HFiles failed quarantine:        " + failures.size());
518      for (Path fq : failures) {
519        out.print("      " + fq);
520      }
521    }
522    out.print("    HFiles moved while checking:     " + missing.size());
523    for (Path mq : missing) {
524      out.print("      " + mq);
525    }
526
527    String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED";
528    String fixedState = (corrupted.size() == quarantined.size()) ? "OK" : "CORRUPTED";
529
530    // print mob-related report
531    out.print("Checked " + mobFilesChecked.get() + " Mob files for corruption");
532    out.print("  Mob files corrupted:                  " + corruptedMobFiles.size());
533    if (inQuarantineMode) {
534      out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
535      for (Path sq : quarantinedMobFiles) {
536        out.print("      " + sq);
537      }
538      out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
539      for (Path fq : failureMobFiles) {
540        out.print("      " + fq);
541      }
542    }
543    out.print("    Mob files moved while checking:     " + missedMobFiles.size());
544    for (Path mq : missedMobFiles) {
545      out.print("      " + mq);
546    }
547    String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED";
548    String fixedMobState =
549      (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK" : "CORRUPTED";
550
551    if (inQuarantineMode) {
552      out.print("Summary: " + initialState + " => " + fixedState);
553      out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
554    } else {
555      out.print("Summary: " + initialState);
556      out.print("Mob summary: " + initialMobState);
557    }
558  }
559}