001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.hbck; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ConcurrentSkipListSet; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Future; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.io.hfile.CacheConfig; 040import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.mob.MobUtils; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.util.FSUtils; 045import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter; 046import org.apache.hadoop.hbase.util.FSUtils.HFileFilter; 047import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter; 048import org.apache.hadoop.hbase.util.HbckErrorReporter; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * This class marches through all of the region's hfiles and verifies that they are all valid files. 055 * One just needs to instantiate the class, use checkTables(List<Path>) and then retrieve the 056 * corrupted hfiles (and quarantined files if in quarantining mode) The implementation currently 057 * parallelizes at the regionDir level. 058 */ 059@InterfaceAudience.Private 060public class HFileCorruptionChecker { 061 private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class); 062 063 final Configuration conf; 064 final FileSystem fs; 065 final CacheConfig cacheConf; 066 final ExecutorService executor; 067 final Set<Path> corrupted = new ConcurrentSkipListSet<>(); 068 final Set<Path> failures = new ConcurrentSkipListSet<>(); 069 final Set<Path> quarantined = new ConcurrentSkipListSet<>(); 070 final Set<Path> missing = new ConcurrentSkipListSet<>(); 071 final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>(); 072 final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>(); 073 final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>(); 074 final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>(); 075 final boolean inQuarantineMode; 076 final AtomicInteger hfilesChecked = new AtomicInteger(); 077 final AtomicInteger mobFilesChecked = new AtomicInteger(); 078 079 public HFileCorruptionChecker(Configuration conf, ExecutorService executor, boolean quarantine) 080 throws IOException { 081 this.conf = conf; 082 this.fs = FileSystem.get(conf); 083 this.cacheConf = CacheConfig.DISABLED; 084 this.executor = executor; 085 this.inQuarantineMode = quarantine; 086 } 087 088 /** 089 * Checks a path to see if it is a valid hfile. full Path to an HFile This is a connectivity 090 * related exception 091 */ 092 protected void checkHFile(Path p) throws IOException { 093 HFile.Reader r = null; 094 try { 095 r = HFile.createReader(fs, p, cacheConf, true, conf); 096 } catch (CorruptHFileException che) { 097 LOG.warn("Found corrupt HFile " + p, che); 098 corrupted.add(p); 099 if (inQuarantineMode) { 100 Path dest = createQuarantinePath(p); 101 LOG.warn("Quarantining corrupt HFile " + p + " into " + dest); 102 boolean success = fs.mkdirs(dest.getParent()); 103 success = success ? fs.rename(p, dest) : false; 104 if (!success) { 105 failures.add(p); 106 } else { 107 quarantined.add(dest); 108 } 109 } 110 return; 111 } catch (FileNotFoundException fnfe) { 112 LOG.warn("HFile " + p + " was missing. Likely removed due to compaction/split?"); 113 missing.add(p); 114 } finally { 115 hfilesChecked.addAndGet(1); 116 if (r != null) { 117 r.close(true); 118 } 119 } 120 } 121 122 /** 123 * Given a path, generates a new path to where we move a corrupted hfile (bad trailer, no 124 * trailer). Path to a corrupt hfile (assumes that it is HBASE_DIR/ table /region/cf/file) 125 * @return path to where corrupted files are stored. This should be 126 * HBASE_DIR/.corrupt/table/region/cf/file. 127 */ 128 Path createQuarantinePath(Path hFile) throws IOException { 129 // extract the normal dirs structure 130 Path cfDir = hFile.getParent(); 131 Path regionDir = cfDir.getParent(); 132 Path tableDir = regionDir.getParent(); 133 134 // build up the corrupted dirs structure 135 Path corruptBaseDir = new Path(CommonFSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME); 136 if (conf.get("hbase.hfile.quarantine.dir") != null) { 137 LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir); 138 } 139 Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName()); 140 Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName()); 141 Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName()); 142 Path corruptHfile = new Path(corruptFamilyDir, hFile.getName()); 143 return corruptHfile; 144 } 145 146 /** 147 * Check all files in a column family dir. column family directory 148 */ 149 protected void checkColFamDir(Path cfDir) throws IOException { 150 FileStatus[] statuses = null; 151 try { 152 statuses = fs.listStatus(cfDir); // use same filter as scanner. 153 } catch (FileNotFoundException fnfe) { 154 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 155 LOG.warn("Colfam Directory " + cfDir 156 + " does not exist. Likely due to concurrent split/compaction. Skipping."); 157 missing.add(cfDir); 158 return; 159 } 160 161 List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); 162 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 163 if (hfs.isEmpty() && !fs.exists(cfDir)) { 164 LOG.warn("Colfam Directory " + cfDir 165 + " does not exist. Likely due to concurrent split/compaction. Skipping."); 166 missing.add(cfDir); 167 return; 168 } 169 170 LOG.info("Checking Column Family Directory {}. Number of entries = {}", cfDir, hfs.size()); 171 172 for (FileStatus hfFs : hfs) { 173 Path hf = hfFs.getPath(); 174 checkHFile(hf); 175 } 176 } 177 178 /** 179 * Check all files in a mob column family dir. mob column family directory 180 */ 181 protected void checkMobColFamDir(Path cfDir) throws IOException { 182 FileStatus[] statuses = null; 183 try { 184 statuses = fs.listStatus(cfDir); // use same filter as scanner. 185 } catch (FileNotFoundException fnfe) { 186 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 187 LOG.warn("Mob colfam Directory " + cfDir 188 + " does not exist. Likely the table is deleted. Skipping."); 189 missedMobFiles.add(cfDir); 190 return; 191 } 192 193 List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); 194 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 195 if (hfs.isEmpty() && !fs.exists(cfDir)) { 196 LOG.warn("Mob colfam Directory " + cfDir 197 + " does not exist. Likely the table is deleted. Skipping."); 198 missedMobFiles.add(cfDir); 199 return; 200 } 201 202 LOG.info("Checking MOB Column Family Directory {}. Number of entries = {}", cfDir, hfs.size()); 203 204 for (FileStatus hfFs : hfs) { 205 Path hf = hfFs.getPath(); 206 checkMobFile(hf); 207 } 208 } 209 210 /** 211 * Checks a path to see if it is a valid mob file. full Path to a mob file. This is a connectivity 212 * related exception 213 */ 214 protected void checkMobFile(Path p) throws IOException { 215 HFile.Reader r = null; 216 try { 217 r = HFile.createReader(fs, p, cacheConf, true, conf); 218 } catch (CorruptHFileException che) { 219 LOG.warn("Found corrupt mob file " + p, che); 220 corruptedMobFiles.add(p); 221 if (inQuarantineMode) { 222 Path dest = createQuarantinePath(p); 223 LOG.warn("Quarantining corrupt mob file " + p + " into " + dest); 224 boolean success = fs.mkdirs(dest.getParent()); 225 success = success ? fs.rename(p, dest) : false; 226 if (!success) { 227 failureMobFiles.add(p); 228 } else { 229 quarantinedMobFiles.add(dest); 230 } 231 } 232 return; 233 } catch (FileNotFoundException fnfe) { 234 LOG.warn("Mob file " + p + " was missing. Likely removed due to compaction?"); 235 missedMobFiles.add(p); 236 } finally { 237 mobFilesChecked.addAndGet(1); 238 if (r != null) { 239 r.close(true); 240 } 241 } 242 } 243 244 /** 245 * Checks all the mob files of a table. 246 * @param regionDir The mob region directory 247 */ 248 private void checkMobRegionDir(Path regionDir) throws IOException { 249 if (!fs.exists(regionDir)) { 250 return; 251 } 252 FileStatus[] hfs = null; 253 try { 254 hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 255 } catch (FileNotFoundException fnfe) { 256 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 257 LOG.warn( 258 "Mob directory " + regionDir + " does not exist. Likely the table is deleted. Skipping."); 259 missedMobFiles.add(regionDir); 260 return; 261 } 262 263 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 264 if (hfs.length == 0 && !fs.exists(regionDir)) { 265 LOG.warn( 266 "Mob directory " + regionDir + " does not exist. Likely the table is deleted. Skipping."); 267 missedMobFiles.add(regionDir); 268 return; 269 } 270 271 LOG.info("Checking MOB Region Directory {}. Number of entries = {}", regionDir, hfs.length); 272 273 for (FileStatus hfFs : hfs) { 274 Path hf = hfFs.getPath(); 275 checkMobColFamDir(hf); 276 } 277 } 278 279 /** 280 * Check all column families in a region dir. region directory 281 */ 282 protected void checkRegionDir(Path regionDir) throws IOException { 283 FileStatus[] statuses = null; 284 try { 285 statuses = fs.listStatus(regionDir); 286 } catch (FileNotFoundException fnfe) { 287 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 288 LOG.warn("Region Directory " + regionDir 289 + " does not exist. Likely due to concurrent split/compaction. Skipping."); 290 missing.add(regionDir); 291 return; 292 } 293 294 List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs)); 295 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 296 if (cfs.isEmpty() && !fs.exists(regionDir)) { 297 LOG.warn("Region Directory " + regionDir 298 + " does not exist. Likely due to concurrent split/compaction. Skipping."); 299 missing.add(regionDir); 300 return; 301 } 302 303 LOG.info("Checking Region Directory {}. Number of entries = {}", regionDir, cfs.size()); 304 305 for (FileStatus cfFs : cfs) { 306 Path cfDir = cfFs.getPath(); 307 checkColFamDir(cfDir); 308 } 309 } 310 311 /** 312 * Check all the regiondirs in the specified tableDir path to a table 313 */ 314 void checkTableDir(Path tableDir) throws IOException { 315 List<FileStatus> rds = 316 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 317 if (rds == null) { 318 if (!fs.exists(tableDir)) { 319 LOG.warn("Table Directory " + tableDir 320 + " does not exist. Likely due to concurrent delete. Skipping."); 321 missing.add(tableDir); 322 } 323 return; 324 } 325 326 LOG.info("Checking Table Directory {}. Number of entries (including mob) = {}", tableDir, 327 rds.size() + 1); 328 329 // Parallelize check at the region dir level 330 List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1); 331 List<Future<Void>> rdFutures; 332 333 for (FileStatus rdFs : rds) { 334 Path rdDir = rdFs.getPath(); 335 RegionDirChecker work = new RegionDirChecker(rdDir); 336 rdcs.add(work); 337 } 338 339 // add mob region 340 rdcs.add(createMobRegionDirChecker(tableDir)); 341 // Submit and wait for completion 342 try { 343 rdFutures = executor.invokeAll(rdcs); 344 } catch (InterruptedException ie) { 345 Thread.currentThread().interrupt(); 346 LOG.warn("Region dirs checking interrupted!", ie); 347 return; 348 } 349 350 for (int i = 0; i < rdFutures.size(); i++) { 351 Future<Void> f = rdFutures.get(i); 352 try { 353 f.get(); 354 } catch (ExecutionException e) { 355 LOG.warn("Failed to quarantine an HFile in regiondir " + rdcs.get(i).regionDir, 356 e.getCause()); 357 // rethrow IOExceptions 358 if (e.getCause() instanceof IOException) { 359 throw (IOException) e.getCause(); 360 } 361 362 // rethrow RuntimeExceptions 363 if (e.getCause() instanceof RuntimeException) { 364 throw (RuntimeException) e.getCause(); 365 } 366 367 // this should never happen 368 LOG.error("Unexpected exception encountered", e); 369 return; // bailing out. 370 } catch (InterruptedException ie) { 371 Thread.currentThread().interrupt(); 372 LOG.warn("Region dirs check interrupted!", ie); 373 // bailing out 374 return; 375 } 376 } 377 } 378 379 /** 380 * An individual work item for parallelized regiondir processing. This is intentionally an inner 381 * class so it can use the shared error sets and fs. 382 */ 383 private class RegionDirChecker implements Callable<Void> { 384 final Path regionDir; 385 386 RegionDirChecker(Path regionDir) { 387 this.regionDir = regionDir; 388 } 389 390 @Override 391 public Void call() throws IOException { 392 checkRegionDir(regionDir); 393 return null; 394 } 395 } 396 397 /** 398 * An individual work item for parallelized mob dir processing. This is intentionally an inner 399 * class so it can use the shared error sets and fs. 400 */ 401 private class MobRegionDirChecker extends RegionDirChecker { 402 403 MobRegionDirChecker(Path regionDir) { 404 super(regionDir); 405 } 406 407 @Override 408 public Void call() throws IOException { 409 checkMobRegionDir(regionDir); 410 return null; 411 } 412 } 413 414 /** 415 * Creates an instance of MobRegionDirChecker. 416 * @param tableDir The current table directory. 417 * @return An instance of MobRegionDirChecker. 418 */ 419 private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) { 420 TableName tableName = CommonFSUtils.getTableName(tableDir); 421 Path mobDir = MobUtils.getMobRegionPath(conf, tableName); 422 return new MobRegionDirChecker(mobDir); 423 } 424 425 /** 426 * Check the specified table dirs for bad hfiles. 427 */ 428 public void checkTables(Collection<Path> tables) throws IOException { 429 for (Path t : tables) { 430 checkTableDir(t); 431 } 432 } 433 434 /** Returns the set of check failure file paths after checkTables is called. */ 435 public Collection<Path> getFailures() { 436 return new HashSet<>(failures); 437 } 438 439 /** Returns the set of corrupted file paths after checkTables is called. */ 440 public Collection<Path> getCorrupted() { 441 return new HashSet<>(corrupted); 442 } 443 444 /** Returns number of hfiles checked in the last HfileCorruptionChecker run */ 445 public int getHFilesChecked() { 446 return hfilesChecked.get(); 447 } 448 449 /** Returns the set of successfully quarantined paths after checkTables is called. */ 450 public Collection<Path> getQuarantined() { 451 return new HashSet<>(quarantined); 452 } 453 454 /** 455 * @return the set of paths that were missing. Likely due to deletion/moves from compaction or 456 * flushes. 457 */ 458 public Collection<Path> getMissing() { 459 return new HashSet<>(missing); 460 } 461 462 /** Returns the set of check failure mob file paths after checkTables is called. */ 463 public Collection<Path> getFailureMobFiles() { 464 return new HashSet<>(failureMobFiles); 465 } 466 467 /** Returns the set of corrupted mob file paths after checkTables is called. */ 468 public Collection<Path> getCorruptedMobFiles() { 469 return new HashSet<>(corruptedMobFiles); 470 } 471 472 /** Returns number of mob files checked in the last HfileCorruptionChecker run */ 473 public int getMobFilesChecked() { 474 return mobFilesChecked.get(); 475 } 476 477 /** Returns the set of successfully quarantined paths after checkTables is called. */ 478 public Collection<Path> getQuarantinedMobFiles() { 479 return new HashSet<>(quarantinedMobFiles); 480 } 481 482 /** 483 * @return the set of paths that were missing. Likely due to table deletion or deletion/moves from 484 * compaction. 485 */ 486 public Collection<Path> getMissedMobFiles() { 487 return new HashSet<>(missedMobFiles); 488 } 489 490 /** 491 * Print a human readable summary of hfile quarantining operations. 492 */ 493 public void report(HbckErrorReporter out) { 494 out.print("Checked " + hfilesChecked.get() + " hfile for corruption"); 495 out.print(" HFiles corrupted: " + corrupted.size()); 496 if (inQuarantineMode) { 497 out.print(" HFiles successfully quarantined: " + quarantined.size()); 498 for (Path sq : quarantined) { 499 out.print(" " + sq); 500 } 501 out.print(" HFiles failed quarantine: " + failures.size()); 502 for (Path fq : failures) { 503 out.print(" " + fq); 504 } 505 } 506 out.print(" HFiles moved while checking: " + missing.size()); 507 for (Path mq : missing) { 508 out.print(" " + mq); 509 } 510 511 String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED"; 512 String fixedState = (corrupted.size() == quarantined.size()) ? "OK" : "CORRUPTED"; 513 514 // print mob-related report 515 out.print("Checked " + mobFilesChecked.get() + " Mob files for corruption"); 516 out.print(" Mob files corrupted: " + corruptedMobFiles.size()); 517 if (inQuarantineMode) { 518 out.print(" Mob files successfully quarantined: " + quarantinedMobFiles.size()); 519 for (Path sq : quarantinedMobFiles) { 520 out.print(" " + sq); 521 } 522 out.print(" Mob files failed quarantine: " + failureMobFiles.size()); 523 for (Path fq : failureMobFiles) { 524 out.print(" " + fq); 525 } 526 } 527 out.print(" Mob files moved while checking: " + missedMobFiles.size()); 528 for (Path mq : missedMobFiles) { 529 out.print(" " + mq); 530 } 531 String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED"; 532 String fixedMobState = 533 (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK" : "CORRUPTED"; 534 535 if (inQuarantineMode) { 536 out.print("Summary: " + initialState + " => " + fixedState); 537 out.print("Mob summary: " + initialMobState + " => " + fixedMobState); 538 } else { 539 out.print("Summary: " + initialState); 540 out.print("Mob summary: " + initialMobState); 541 } 542 } 543}