001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.hbck; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ConcurrentSkipListSet; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Future; 032import java.util.concurrent.atomic.AtomicInteger; 033 034import org.apache.hadoop.hbase.util.HbckErrorReporter; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 046import org.apache.hadoop.hbase.io.hfile.HFile; 047import org.apache.hadoop.hbase.mob.MobUtils; 048import org.apache.hadoop.hbase.util.FSUtils; 049import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter; 050import org.apache.hadoop.hbase.util.FSUtils.HFileFilter; 051import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter; 052 053/** 054 * This class marches through all of the region's hfiles and verifies that 055 * they are all valid files. One just needs to instantiate the class, use 056 * checkTables(List<Path>) and then retrieve the corrupted hfiles (and 057 * quarantined files if in quarantining mode) 058 * 059 * The implementation currently parallelizes at the regionDir level. 060 */ 061@InterfaceAudience.Private 062public class HFileCorruptionChecker { 063 private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class); 064 065 final Configuration conf; 066 final FileSystem fs; 067 final CacheConfig cacheConf; 068 final ExecutorService executor; 069 final Set<Path> corrupted = new ConcurrentSkipListSet<>(); 070 final Set<Path> failures = new ConcurrentSkipListSet<>(); 071 final Set<Path> quarantined = new ConcurrentSkipListSet<>(); 072 final Set<Path> missing = new ConcurrentSkipListSet<>(); 073 final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>(); 074 final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>(); 075 final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>(); 076 final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>(); 077 final boolean inQuarantineMode; 078 final AtomicInteger hfilesChecked = new AtomicInteger(); 079 final AtomicInteger mobFilesChecked = new AtomicInteger(); 080 081 public HFileCorruptionChecker(Configuration conf, ExecutorService executor, 082 boolean quarantine) throws IOException { 083 this.conf = conf; 084 this.fs = FileSystem.get(conf); 085 this.cacheConf = CacheConfig.DISABLED; 086 this.executor = executor; 087 this.inQuarantineMode = quarantine; 088 } 089 090 /** 091 * Checks a path to see if it is a valid hfile. 092 * 093 * @param p 094 * full Path to an HFile 095 * @throws IOException 096 * This is a connectivity related exception 097 */ 098 protected void checkHFile(Path p) throws IOException { 099 HFile.Reader r = null; 100 try { 101 r = HFile.createReader(fs, p, cacheConf, true, conf); 102 } catch (CorruptHFileException che) { 103 LOG.warn("Found corrupt HFile " + p, che); 104 corrupted.add(p); 105 if (inQuarantineMode) { 106 Path dest = createQuarantinePath(p); 107 LOG.warn("Quarantining corrupt HFile " + p + " into " + dest); 108 boolean success = fs.mkdirs(dest.getParent()); 109 success = success ? fs.rename(p, dest): false; 110 if (!success) { 111 failures.add(p); 112 } else { 113 quarantined.add(dest); 114 } 115 } 116 return; 117 } catch (FileNotFoundException fnfe) { 118 LOG.warn("HFile " + p + " was missing. Likely removed due to compaction/split?"); 119 missing.add(p); 120 } finally { 121 hfilesChecked.addAndGet(1); 122 if (r != null) { 123 r.close(true); 124 } 125 } 126 } 127 128 /** 129 * Given a path, generates a new path to where we move a corrupted hfile (bad 130 * trailer, no trailer). 131 * 132 * @param hFile 133 * Path to a corrupt hfile (assumes that it is HBASE_DIR/ table 134 * /region/cf/file) 135 * @return path to where corrupted files are stored. This should be 136 * HBASE_DIR/.corrupt/table/region/cf/file. 137 */ 138 Path createQuarantinePath(Path hFile) throws IOException { 139 // extract the normal dirs structure 140 Path cfDir = hFile.getParent(); 141 Path regionDir = cfDir.getParent(); 142 Path tableDir = regionDir.getParent(); 143 144 // build up the corrupted dirs structure 145 Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME); 146 if (conf.get("hbase.hfile.quarantine.dir") != null) { 147 LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir); 148 } 149 Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName()); 150 Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName()); 151 Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName()); 152 Path corruptHfile = new Path(corruptFamilyDir, hFile.getName()); 153 return corruptHfile; 154 } 155 156 /** 157 * Check all files in a column family dir. 158 * 159 * @param cfDir 160 * column family directory 161 * @throws IOException 162 */ 163 protected void checkColFamDir(Path cfDir) throws IOException { 164 FileStatus[] statuses = null; 165 try { 166 statuses = fs.listStatus(cfDir); // use same filter as scanner. 167 } catch (FileNotFoundException fnfe) { 168 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 169 LOG.warn("Colfam Directory " + cfDir + 170 " does not exist. Likely due to concurrent split/compaction. Skipping."); 171 missing.add(cfDir); 172 return; 173 } 174 175 List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); 176 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 177 if (hfs.isEmpty() && !fs.exists(cfDir)) { 178 LOG.warn("Colfam Directory " + cfDir + 179 " does not exist. Likely due to concurrent split/compaction. Skipping."); 180 missing.add(cfDir); 181 return; 182 } 183 for (FileStatus hfFs : hfs) { 184 Path hf = hfFs.getPath(); 185 checkHFile(hf); 186 } 187 } 188 189 /** 190 * Check all files in a mob column family dir. 191 * 192 * @param cfDir 193 * mob column family directory 194 * @throws IOException 195 */ 196 protected void checkMobColFamDir(Path cfDir) throws IOException { 197 FileStatus[] statuses = null; 198 try { 199 statuses = fs.listStatus(cfDir); // use same filter as scanner. 200 } catch (FileNotFoundException fnfe) { 201 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 202 LOG.warn("Mob colfam Directory " + cfDir + 203 " does not exist. Likely the table is deleted. Skipping."); 204 missedMobFiles.add(cfDir); 205 return; 206 } 207 208 List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); 209 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 210 if (hfs.isEmpty() && !fs.exists(cfDir)) { 211 LOG.warn("Mob colfam Directory " + cfDir + 212 " does not exist. Likely the table is deleted. Skipping."); 213 missedMobFiles.add(cfDir); 214 return; 215 } 216 for (FileStatus hfFs : hfs) { 217 Path hf = hfFs.getPath(); 218 checkMobFile(hf); 219 } 220 } 221 222 /** 223 * Checks a path to see if it is a valid mob file. 224 * 225 * @param p 226 * full Path to a mob file. 227 * @throws IOException 228 * This is a connectivity related exception 229 */ 230 protected void checkMobFile(Path p) throws IOException { 231 HFile.Reader r = null; 232 try { 233 r = HFile.createReader(fs, p, cacheConf, true, conf); 234 } catch (CorruptHFileException che) { 235 LOG.warn("Found corrupt mob file " + p, che); 236 corruptedMobFiles.add(p); 237 if (inQuarantineMode) { 238 Path dest = createQuarantinePath(p); 239 LOG.warn("Quarantining corrupt mob file " + p + " into " + dest); 240 boolean success = fs.mkdirs(dest.getParent()); 241 success = success ? fs.rename(p, dest): false; 242 if (!success) { 243 failureMobFiles.add(p); 244 } else { 245 quarantinedMobFiles.add(dest); 246 } 247 } 248 return; 249 } catch (FileNotFoundException fnfe) { 250 LOG.warn("Mob file " + p + " was missing. Likely removed due to compaction?"); 251 missedMobFiles.add(p); 252 } finally { 253 mobFilesChecked.addAndGet(1); 254 if (r != null) { 255 r.close(true); 256 } 257 } 258 } 259 260 /** 261 * Checks all the mob files of a table. 262 * @param regionDir The mob region directory 263 * @throws IOException 264 */ 265 private void checkMobRegionDir(Path regionDir) throws IOException { 266 if (!fs.exists(regionDir)) { 267 return; 268 } 269 FileStatus[] hfs = null; 270 try { 271 hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 272 } catch (FileNotFoundException fnfe) { 273 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 274 LOG.warn("Mob directory " + regionDir 275 + " does not exist. Likely the table is deleted. Skipping."); 276 missedMobFiles.add(regionDir); 277 return; 278 } 279 280 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 281 if (hfs.length == 0 && !fs.exists(regionDir)) { 282 LOG.warn("Mob directory " + regionDir 283 + " does not exist. Likely the table is deleted. Skipping."); 284 missedMobFiles.add(regionDir); 285 return; 286 } 287 for (FileStatus hfFs : hfs) { 288 Path hf = hfFs.getPath(); 289 checkMobColFamDir(hf); 290 } 291 } 292 293 /** 294 * Check all column families in a region dir. 295 * 296 * @param regionDir 297 * region directory 298 * @throws IOException 299 */ 300 protected void checkRegionDir(Path regionDir) throws IOException { 301 FileStatus[] statuses = null; 302 try { 303 statuses = fs.listStatus(regionDir); 304 } catch (FileNotFoundException fnfe) { 305 // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. 306 LOG.warn("Region Directory " + regionDir + 307 " does not exist. Likely due to concurrent split/compaction. Skipping."); 308 missing.add(regionDir); 309 return; 310 } 311 312 List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs)); 313 // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. 314 if (cfs.isEmpty() && !fs.exists(regionDir)) { 315 LOG.warn("Region Directory " + regionDir + 316 " does not exist. Likely due to concurrent split/compaction. Skipping."); 317 missing.add(regionDir); 318 return; 319 } 320 321 for (FileStatus cfFs : cfs) { 322 Path cfDir = cfFs.getPath(); 323 checkColFamDir(cfDir); 324 } 325 } 326 327 /** 328 * Check all the regiondirs in the specified tableDir 329 * 330 * @param tableDir 331 * path to a table 332 * @throws IOException 333 */ 334 void checkTableDir(Path tableDir) throws IOException { 335 List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 336 if (rds == null) { 337 if (!fs.exists(tableDir)) { 338 LOG.warn("Table Directory " + tableDir + 339 " does not exist. Likely due to concurrent delete. Skipping."); 340 missing.add(tableDir); 341 } 342 return; 343 } 344 345 // Parallelize check at the region dir level 346 List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1); 347 List<Future<Void>> rdFutures; 348 349 for (FileStatus rdFs : rds) { 350 Path rdDir = rdFs.getPath(); 351 RegionDirChecker work = new RegionDirChecker(rdDir); 352 rdcs.add(work); 353 } 354 355 // add mob region 356 rdcs.add(createMobRegionDirChecker(tableDir)); 357 // Submit and wait for completion 358 try { 359 rdFutures = executor.invokeAll(rdcs); 360 } catch (InterruptedException ie) { 361 Thread.currentThread().interrupt(); 362 LOG.warn("Region dirs checking interrupted!", ie); 363 return; 364 } 365 366 for (int i = 0; i < rdFutures.size(); i++) { 367 Future<Void> f = rdFutures.get(i); 368 try { 369 f.get(); 370 } catch (ExecutionException e) { 371 LOG.warn("Failed to quarantine an HFile in regiondir " 372 + rdcs.get(i).regionDir, e.getCause()); 373 // rethrow IOExceptions 374 if (e.getCause() instanceof IOException) { 375 throw (IOException) e.getCause(); 376 } 377 378 // rethrow RuntimeExceptions 379 if (e.getCause() instanceof RuntimeException) { 380 throw (RuntimeException) e.getCause(); 381 } 382 383 // this should never happen 384 LOG.error("Unexpected exception encountered", e); 385 return; // bailing out. 386 } catch (InterruptedException ie) { 387 Thread.currentThread().interrupt(); 388 LOG.warn("Region dirs check interrupted!", ie); 389 // bailing out 390 return; 391 } 392 } 393 } 394 395 /** 396 * An individual work item for parallelized regiondir processing. This is 397 * intentionally an inner class so it can use the shared error sets and fs. 398 */ 399 private class RegionDirChecker implements Callable<Void> { 400 final Path regionDir; 401 402 RegionDirChecker(Path regionDir) { 403 this.regionDir = regionDir; 404 } 405 406 @Override 407 public Void call() throws IOException { 408 checkRegionDir(regionDir); 409 return null; 410 } 411 } 412 413 /** 414 * An individual work item for parallelized mob dir processing. This is 415 * intentionally an inner class so it can use the shared error sets and fs. 416 */ 417 private class MobRegionDirChecker extends RegionDirChecker { 418 419 MobRegionDirChecker(Path regionDir) { 420 super(regionDir); 421 } 422 423 @Override 424 public Void call() throws IOException { 425 checkMobRegionDir(regionDir); 426 return null; 427 } 428 } 429 430 /** 431 * Creates an instance of MobRegionDirChecker. 432 * @param tableDir The current table directory. 433 * @return An instance of MobRegionDirChecker. 434 */ 435 private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) { 436 TableName tableName = FSUtils.getTableName(tableDir); 437 Path mobDir = MobUtils.getMobRegionPath(conf, tableName); 438 return new MobRegionDirChecker(mobDir); 439 } 440 441 /** 442 * Check the specified table dirs for bad hfiles. 443 */ 444 public void checkTables(Collection<Path> tables) throws IOException { 445 for (Path t : tables) { 446 checkTableDir(t); 447 } 448 } 449 450 /** 451 * @return the set of check failure file paths after checkTables is called. 452 */ 453 public Collection<Path> getFailures() { 454 return new HashSet<>(failures); 455 } 456 457 /** 458 * @return the set of corrupted file paths after checkTables is called. 459 */ 460 public Collection<Path> getCorrupted() { 461 return new HashSet<>(corrupted); 462 } 463 464 /** 465 * @return number of hfiles checked in the last HfileCorruptionChecker run 466 */ 467 public int getHFilesChecked() { 468 return hfilesChecked.get(); 469 } 470 471 /** 472 * @return the set of successfully quarantined paths after checkTables is called. 473 */ 474 public Collection<Path> getQuarantined() { 475 return new HashSet<>(quarantined); 476 } 477 478 /** 479 * @return the set of paths that were missing. Likely due to deletion/moves from 480 * compaction or flushes. 481 */ 482 public Collection<Path> getMissing() { 483 return new HashSet<>(missing); 484 } 485 486 /** 487 * @return the set of check failure mob file paths after checkTables is called. 488 */ 489 public Collection<Path> getFailureMobFiles() { 490 return new HashSet<>(failureMobFiles); 491 } 492 493 /** 494 * @return the set of corrupted mob file paths after checkTables is called. 495 */ 496 public Collection<Path> getCorruptedMobFiles() { 497 return new HashSet<>(corruptedMobFiles); 498 } 499 500 /** 501 * @return number of mob files checked in the last HfileCorruptionChecker run 502 */ 503 public int getMobFilesChecked() { 504 return mobFilesChecked.get(); 505 } 506 507 /** 508 * @return the set of successfully quarantined paths after checkTables is called. 509 */ 510 public Collection<Path> getQuarantinedMobFiles() { 511 return new HashSet<>(quarantinedMobFiles); 512 } 513 514 /** 515 * @return the set of paths that were missing. Likely due to table deletion or 516 * deletion/moves from compaction. 517 */ 518 public Collection<Path> getMissedMobFiles() { 519 return new HashSet<>(missedMobFiles); 520 } 521 522 /** 523 * Print a human readable summary of hfile quarantining operations. 524 * @param out 525 */ 526 public void report(HbckErrorReporter out) { 527 out.print("Checked " + hfilesChecked.get() + " hfile for corruption"); 528 out.print(" HFiles corrupted: " + corrupted.size()); 529 if (inQuarantineMode) { 530 out.print(" HFiles successfully quarantined: " + quarantined.size()); 531 for (Path sq : quarantined) { 532 out.print(" " + sq); 533 } 534 out.print(" HFiles failed quarantine: " + failures.size()); 535 for (Path fq : failures) { 536 out.print(" " + fq); 537 } 538 } 539 out.print(" HFiles moved while checking: " + missing.size()); 540 for (Path mq : missing) { 541 out.print(" " + mq); 542 } 543 544 String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED"; 545 String fixedState = (corrupted.size() == quarantined.size()) ? "OK" 546 : "CORRUPTED"; 547 548 // print mob-related report 549 if (inQuarantineMode) { 550 out.print(" Mob files successfully quarantined: " + quarantinedMobFiles.size()); 551 for (Path sq : quarantinedMobFiles) { 552 out.print(" " + sq); 553 } 554 out.print(" Mob files failed quarantine: " + failureMobFiles.size()); 555 for (Path fq : failureMobFiles) { 556 out.print(" " + fq); 557 } 558 } 559 out.print(" Mob files moved while checking: " + missedMobFiles.size()); 560 for (Path mq : missedMobFiles) { 561 out.print(" " + mq); 562 } 563 String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED"; 564 String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK" 565 : "CORRUPTED"; 566 567 if (inQuarantineMode) { 568 out.print("Summary: " + initialState + " => " + fixedState); 569 out.print("Mob summary: " + initialMobState + " => " + fixedMobState); 570 } else { 571 out.print("Summary: " + initialState); 572 out.print("Mob summary: " + initialMobState); 573 } 574 } 575}