001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.hbck; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ConcurrentSkipListSet; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Future; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.io.hfile.CacheConfig; 040import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.mob.MobUtils; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.util.FSUtils; 045import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter; 046import org.apache.hadoop.hbase.util.FSUtils.HFileFilter; 047import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter; 048import org.apache.hadoop.hbase.util.HbckErrorReporter; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053 054/** 055 * This class marches through all of the region's hfiles and verifies that 056 * they are all valid files. One just needs to instantiate the class, use 057 * checkTables(List<Path>) and then retrieve the corrupted hfiles (and 058 * quarantined files if in quarantining mode) 059 * 060 * The implementation currently parallelizes at the regionDir level. 061 */ 062@InterfaceAudience.Private 063public class HFileCorruptionChecker { 064 private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class); 065 066 final Configuration conf; 067 final FileSystem fs; 068 final CacheConfig cacheConf; 069 final ExecutorService executor; 070 final Set<Path> corrupted = new ConcurrentSkipListSet<>(); 071 final Set<Path> failures = new ConcurrentSkipListSet<>(); 072 final Set<Path> quarantined = new ConcurrentSkipListSet<>(); 073 final Set<Path> missing = new ConcurrentSkipListSet<>(); 074 final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>(); 075 final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>(); 076 final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>(); 077 final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>(); 078 final boolean inQuarantineMode; 079 final AtomicInteger hfilesChecked = new AtomicInteger(); 080 final AtomicInteger mobFilesChecked = new AtomicInteger(); 081 082 public HFileCorruptionChecker(Configuration conf, ExecutorService executor, 083 boolean quarantine) throws IOException { 084 this.conf = conf; 085 this.fs = FileSystem.get(conf); 086 this.cacheConf = CacheConfig.DISABLED; 087 this.executor = executor; 088 this.inQuarantineMode = quarantine; 089 } 090 091 /** 092 * Checks a path to see if it is a valid hfile. 093 * 094 * @param p 095 * full Path to an HFile 096 * @throws IOException 097 * This is a connectivity related exception 098 */ 099 protected void checkHFile(Path p) throws IOException { 100 HFile.Reader r = null; 101 try { 102 r = HFile.createReader(fs, p, cacheConf, true, conf); 103 } catch (CorruptHFileException che) { 104 LOG.warn("Found corrupt HFile " + p, che); 105 corrupted.add(p); 106 if (inQuarantineMode) { 107 Path dest = createQuarantinePath(p); 108 LOG.warn("Quarantining corrupt HFile " + p + " into " + dest); 109 boolean success = fs.mkdirs(dest.getParent()); 110 success = success ? fs.rename(p, dest): false; 111 if (!success) { 112 failures.add(p); 113 } else { 114 quarantined.add(dest); 115 } 116 } 117 return; 118 } catch (FileNotFoundException fnfe) { 119 LOG.warn("HFile " + p + " was missing. Likely removed due to compaction/split?"); 120 missing.add(p); 121 } finally { 122 hfilesChecked.addAndGet(1); 123 if (r != null) { 124 r.close(true); 125 } 126 } 127 } 128 129 /** 130 * Given a path, generates a new path to where we move a corrupted hfile (bad 131 * trailer, no trailer). 132 * 133 * @param hFile 134 * Path to a corrupt hfile (assumes that it is HBASE_DIR/ table 135 * /region/cf/file) 136 * @return path to where corrupted files are stored. This should be 137 * HBASE_DIR/.corrupt/table/region/cf/file. 138 */ 139 Path createQuarantinePath(Path hFile) throws IOException { 140 // extract the normal dirs structure 141 Path cfDir = hFile.getParent(); 142 Path regionDir = cfDir.getParent(); 143 Path tableDir = regionDir.getParent(); 144 145 // build up the corrupted dirs structure 146 Path corruptBaseDir = new Path(CommonFSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME); 147 if (conf.get("hbase.hfile.quarantine.dir") != null) { 148 LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir); 149 } 150 Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName()); 151 Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName()); 152 Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName()); 153 Path corruptHfile = new Path(corruptFamilyDir, hFile.getName()); 154 return corruptHfile; 155 } 156 157 /** 158 * Check all files in a column family dir. 159 * 160 * @param cfDir 161 * column family directory 162 * @throws IOException 163 */ 164 protected void checkColFamDir(Path cfDir) throws IOException { 165 FileStatus[] statuses = null; 166 try { 167 statuses = fs.listStatus(cfDir); // use same filter as scanner. 168 } catch (FileNotFoundException fnfe) { 169 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 170 LOG.warn("Colfam Directory " + cfDir + 171 " does not exist. Likely due to concurrent split/compaction. Skipping."); 172 missing.add(cfDir); 173 return; 174 } 175 176 List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); 177 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 178 if (hfs.isEmpty() && !fs.exists(cfDir)) { 179 LOG.warn("Colfam Directory " + cfDir + 180 " does not exist. Likely due to concurrent split/compaction. Skipping."); 181 missing.add(cfDir); 182 return; 183 } 184 185 LOG.info("Checking Column Family Directory {}. Number of entries = {}", cfDir, hfs.size()); 186 187 for (FileStatus hfFs : hfs) { 188 Path hf = hfFs.getPath(); 189 checkHFile(hf); 190 } 191 } 192 193 /** 194 * Check all files in a mob column family dir. 195 * 196 * @param cfDir 197 * mob column family directory 198 * @throws IOException 199 */ 200 protected void checkMobColFamDir(Path cfDir) throws IOException { 201 FileStatus[] statuses = null; 202 try { 203 statuses = fs.listStatus(cfDir); // use same filter as scanner. 204 } catch (FileNotFoundException fnfe) { 205 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 206 LOG.warn("Mob colfam Directory " + cfDir + 207 " does not exist. Likely the table is deleted. Skipping."); 208 missedMobFiles.add(cfDir); 209 return; 210 } 211 212 List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); 213 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 214 if (hfs.isEmpty() && !fs.exists(cfDir)) { 215 LOG.warn("Mob colfam Directory " + cfDir + 216 " does not exist. Likely the table is deleted. Skipping."); 217 missedMobFiles.add(cfDir); 218 return; 219 } 220 221 LOG.info("Checking MOB Column Family Directory {}. Number of entries = {}", cfDir, hfs.size()); 222 223 for (FileStatus hfFs : hfs) { 224 Path hf = hfFs.getPath(); 225 checkMobFile(hf); 226 } 227 } 228 229 /** 230 * Checks a path to see if it is a valid mob file. 231 * 232 * @param p 233 * full Path to a mob file. 234 * @throws IOException 235 * This is a connectivity related exception 236 */ 237 protected void checkMobFile(Path p) throws IOException { 238 HFile.Reader r = null; 239 try { 240 r = HFile.createReader(fs, p, cacheConf, true, conf); 241 } catch (CorruptHFileException che) { 242 LOG.warn("Found corrupt mob file " + p, che); 243 corruptedMobFiles.add(p); 244 if (inQuarantineMode) { 245 Path dest = createQuarantinePath(p); 246 LOG.warn("Quarantining corrupt mob file " + p + " into " + dest); 247 boolean success = fs.mkdirs(dest.getParent()); 248 success = success ? fs.rename(p, dest): false; 249 if (!success) { 250 failureMobFiles.add(p); 251 } else { 252 quarantinedMobFiles.add(dest); 253 } 254 } 255 return; 256 } catch (FileNotFoundException fnfe) { 257 LOG.warn("Mob file " + p + " was missing. Likely removed due to compaction?"); 258 missedMobFiles.add(p); 259 } finally { 260 mobFilesChecked.addAndGet(1); 261 if (r != null) { 262 r.close(true); 263 } 264 } 265 } 266 267 /** 268 * Checks all the mob files of a table. 269 * @param regionDir The mob region directory 270 * @throws IOException 271 */ 272 private void checkMobRegionDir(Path regionDir) throws IOException { 273 if (!fs.exists(regionDir)) { 274 return; 275 } 276 FileStatus[] hfs = null; 277 try { 278 hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 279 } catch (FileNotFoundException fnfe) { 280 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 281 LOG.warn("Mob directory " + regionDir 282 + " does not exist. Likely the table is deleted. Skipping."); 283 missedMobFiles.add(regionDir); 284 return; 285 } 286 287 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 288 if (hfs.length == 0 && !fs.exists(regionDir)) { 289 LOG.warn("Mob directory " + regionDir 290 + " does not exist. Likely the table is deleted. Skipping."); 291 missedMobFiles.add(regionDir); 292 return; 293 } 294 295 LOG.info("Checking MOB Region Directory {}. Number of entries = {}", regionDir, hfs.length); 296 297 for (FileStatus hfFs : hfs) { 298 Path hf = hfFs.getPath(); 299 checkMobColFamDir(hf); 300 } 301 } 302 303 /** 304 * Check all column families in a region dir. 305 * 306 * @param regionDir 307 * region directory 308 * @throws IOException 309 */ 310 protected void checkRegionDir(Path regionDir) throws IOException { 311 FileStatus[] statuses = null; 312 try { 313 statuses = fs.listStatus(regionDir); 314 } catch (FileNotFoundException fnfe) { 315 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 316 LOG.warn("Region Directory " + regionDir + 317 " does not exist. Likely due to concurrent split/compaction. Skipping."); 318 missing.add(regionDir); 319 return; 320 } 321 322 List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs)); 323 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 324 if (cfs.isEmpty() && !fs.exists(regionDir)) { 325 LOG.warn("Region Directory " + regionDir + 326 " does not exist. Likely due to concurrent split/compaction. Skipping."); 327 missing.add(regionDir); 328 return; 329 } 330 331 LOG.info("Checking Region Directory {}. Number of entries = {}", regionDir, cfs.size()); 332 333 for (FileStatus cfFs : cfs) { 334 Path cfDir = cfFs.getPath(); 335 checkColFamDir(cfDir); 336 } 337 } 338 339 /** 340 * Check all the regiondirs in the specified tableDir 341 * 342 * @param tableDir 343 * path to a table 344 * @throws IOException 345 */ 346 void checkTableDir(Path tableDir) throws IOException { 347 List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 348 if (rds == null) { 349 if (!fs.exists(tableDir)) { 350 LOG.warn("Table Directory " + tableDir + 351 " does not exist. Likely due to concurrent delete. Skipping."); 352 missing.add(tableDir); 353 } 354 return; 355 } 356 357 LOG.info("Checking Table Directory {}. Number of entries (including mob) = {}", tableDir, rds.size() + 1); 358 359 // Parallelize check at the region dir level 360 List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1); 361 List<Future<Void>> rdFutures; 362 363 for (FileStatus rdFs : rds) { 364 Path rdDir = rdFs.getPath(); 365 RegionDirChecker work = new RegionDirChecker(rdDir); 366 rdcs.add(work); 367 } 368 369 // add mob region 370 rdcs.add(createMobRegionDirChecker(tableDir)); 371 // Submit and wait for completion 372 try { 373 rdFutures = executor.invokeAll(rdcs); 374 } catch (InterruptedException ie) { 375 Thread.currentThread().interrupt(); 376 LOG.warn("Region dirs checking interrupted!", ie); 377 return; 378 } 379 380 for (int i = 0; i < rdFutures.size(); i++) { 381 Future<Void> f = rdFutures.get(i); 382 try { 383 f.get(); 384 } catch (ExecutionException e) { 385 LOG.warn("Failed to quarantine an HFile in regiondir " 386 + rdcs.get(i).regionDir, e.getCause()); 387 // rethrow IOExceptions 388 if (e.getCause() instanceof IOException) { 389 throw (IOException) e.getCause(); 390 } 391 392 // rethrow RuntimeExceptions 393 if (e.getCause() instanceof RuntimeException) { 394 throw (RuntimeException) e.getCause(); 395 } 396 397 // this should never happen 398 LOG.error("Unexpected exception encountered", e); 399 return; // bailing out. 400 } catch (InterruptedException ie) { 401 Thread.currentThread().interrupt(); 402 LOG.warn("Region dirs check interrupted!", ie); 403 // bailing out 404 return; 405 } 406 } 407 } 408 409 /** 410 * An individual work item for parallelized regiondir processing. This is 411 * intentionally an inner class so it can use the shared error sets and fs. 412 */ 413 private class RegionDirChecker implements Callable<Void> { 414 final Path regionDir; 415 416 RegionDirChecker(Path regionDir) { 417 this.regionDir = regionDir; 418 } 419 420 @Override 421 public Void call() throws IOException { 422 checkRegionDir(regionDir); 423 return null; 424 } 425 } 426 427 /** 428 * An individual work item for parallelized mob dir processing. This is 429 * intentionally an inner class so it can use the shared error sets and fs. 430 */ 431 private class MobRegionDirChecker extends RegionDirChecker { 432 433 MobRegionDirChecker(Path regionDir) { 434 super(regionDir); 435 } 436 437 @Override 438 public Void call() throws IOException { 439 checkMobRegionDir(regionDir); 440 return null; 441 } 442 } 443 444 /** 445 * Creates an instance of MobRegionDirChecker. 446 * @param tableDir The current table directory. 447 * @return An instance of MobRegionDirChecker. 448 */ 449 private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) { 450 TableName tableName = CommonFSUtils.getTableName(tableDir); 451 Path mobDir = MobUtils.getMobRegionPath(conf, tableName); 452 return new MobRegionDirChecker(mobDir); 453 } 454 455 /** 456 * Check the specified table dirs for bad hfiles. 457 */ 458 public void checkTables(Collection<Path> tables) throws IOException { 459 for (Path t : tables) { 460 checkTableDir(t); 461 } 462 } 463 464 /** 465 * @return the set of check failure file paths after checkTables is called. 466 */ 467 public Collection<Path> getFailures() { 468 return new HashSet<>(failures); 469 } 470 471 /** 472 * @return the set of corrupted file paths after checkTables is called. 473 */ 474 public Collection<Path> getCorrupted() { 475 return new HashSet<>(corrupted); 476 } 477 478 /** 479 * @return number of hfiles checked in the last HfileCorruptionChecker run 480 */ 481 public int getHFilesChecked() { 482 return hfilesChecked.get(); 483 } 484 485 /** 486 * @return the set of successfully quarantined paths after checkTables is called. 487 */ 488 public Collection<Path> getQuarantined() { 489 return new HashSet<>(quarantined); 490 } 491 492 /** 493 * @return the set of paths that were missing. Likely due to deletion/moves from 494 * compaction or flushes. 495 */ 496 public Collection<Path> getMissing() { 497 return new HashSet<>(missing); 498 } 499 500 /** 501 * @return the set of check failure mob file paths after checkTables is called. 502 */ 503 public Collection<Path> getFailureMobFiles() { 504 return new HashSet<>(failureMobFiles); 505 } 506 507 /** 508 * @return the set of corrupted mob file paths after checkTables is called. 509 */ 510 public Collection<Path> getCorruptedMobFiles() { 511 return new HashSet<>(corruptedMobFiles); 512 } 513 514 /** 515 * @return number of mob files checked in the last HfileCorruptionChecker run 516 */ 517 public int getMobFilesChecked() { 518 return mobFilesChecked.get(); 519 } 520 521 /** 522 * @return the set of successfully quarantined paths after checkTables is called. 523 */ 524 public Collection<Path> getQuarantinedMobFiles() { 525 return new HashSet<>(quarantinedMobFiles); 526 } 527 528 /** 529 * @return the set of paths that were missing. Likely due to table deletion or 530 * deletion/moves from compaction. 531 */ 532 public Collection<Path> getMissedMobFiles() { 533 return new HashSet<>(missedMobFiles); 534 } 535 536 /** 537 * Print a human readable summary of hfile quarantining operations. 538 * @param out 539 */ 540 public void report(HbckErrorReporter out) { 541 out.print("Checked " + hfilesChecked.get() + " hfile for corruption"); 542 out.print(" HFiles corrupted: " + corrupted.size()); 543 if (inQuarantineMode) { 544 out.print(" HFiles successfully quarantined: " + quarantined.size()); 545 for (Path sq : quarantined) { 546 out.print(" " + sq); 547 } 548 out.print(" HFiles failed quarantine: " + failures.size()); 549 for (Path fq : failures) { 550 out.print(" " + fq); 551 } 552 } 553 out.print(" HFiles moved while checking: " + missing.size()); 554 for (Path mq : missing) { 555 out.print(" " + mq); 556 } 557 558 String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED"; 559 String fixedState = (corrupted.size() == quarantined.size()) ? "OK" 560 : "CORRUPTED"; 561 562 // print mob-related report 563 out.print("Checked " + mobFilesChecked.get() + " Mob files for corruption"); 564 out.print(" Mob files corrupted: " + corruptedMobFiles.size()); 565 if (inQuarantineMode) { 566 out.print(" Mob files successfully quarantined: " + quarantinedMobFiles.size()); 567 for (Path sq : quarantinedMobFiles) { 568 out.print(" " + sq); 569 } 570 out.print(" Mob files failed quarantine: " + failureMobFiles.size()); 571 for (Path fq : failureMobFiles) { 572 out.print(" " + fq); 573 } 574 } 575 out.print(" Mob files moved while checking: " + missedMobFiles.size()); 576 for (Path mq : missedMobFiles) { 577 out.print(" " + mq); 578 } 579 String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED"; 580 String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK" 581 : "CORRUPTED"; 582 583 if (inQuarantineMode) { 584 out.print("Summary: " + initialState + " => " + fixedState); 585 out.print("Mob summary: " + initialMobState + " => " + fixedMobState); 586 } else { 587 out.print("Summary: " + initialState); 588 out.print("Mob summary: " + initialMobState); 589 } 590 } 591}