001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Queue; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.Future; 033import java.util.concurrent.ThreadFactory; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.Function; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.fs.PathFilter; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.regionserver.HStoreFile; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.FSUtils; 052import org.apache.hadoop.hbase.util.HFileArchiveUtil; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.hadoop.io.MultipleIOException; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060 061/** 062 * Utility class to handle the removal of HFiles (or the respective {@link HStoreFile StoreFiles}) 063 * for a HRegion from the {@link FileSystem}. The hfiles will be archived or deleted, depending on 064 * the state of the system. 065 */ 066@InterfaceAudience.Private 067public class HFileArchiver { 068 private static final Logger LOG = LoggerFactory.getLogger(HFileArchiver.class); 069 private static final String SEPARATOR = "."; 070 071 /** Number of retries in case of fs operation failure */ 072 private static final int DEFAULT_RETRIES_NUMBER = 3; 073 074 private static final Function<File, Path> FUNC_FILE_TO_PATH = new Function<File, Path>() { 075 @Override 076 public Path apply(File file) { 077 return file == null ? null : file.getPath(); 078 } 079 }; 080 081 private static ThreadPoolExecutor archiveExecutor; 082 083 private HFileArchiver() { 084 // hidden ctor since this is just a util 085 } 086 087 /** Returns True if the Region exits in the filesystem. */ 088 public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info) 089 throws IOException { 090 Path rootDir = CommonFSUtils.getRootDir(conf); 091 Path regionDir = FSUtils.getRegionDirFromRootDir(rootDir, info); 092 return fs.exists(regionDir); 093 } 094 095 /** 096 * Cleans up all the files for a HRegion by archiving the HFiles to the archive directory 097 * @param conf the configuration to use 098 * @param fs the file system object 099 * @param info RegionInfo for region to be deleted 100 */ 101 public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info) 102 throws IOException { 103 Path rootDir = CommonFSUtils.getRootDir(conf); 104 archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()), 105 FSUtils.getRegionDirFromRootDir(rootDir, info)); 106 } 107 108 /** 109 * Cleans up all the files for a HRegion by archiving the HFiles to the archive directory 110 * @param conf the configuration to use 111 * @param fs the file system object 112 * @param info RegionInfo for region to be deleted 113 * @param rootDir {@link Path} to the root directory where hbase files are stored (for building 114 * the archive path) 115 * @param tableDir {@link Path} to where the table is being stored (for building the archive path) 116 */ 117 public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info, Path rootDir, 118 Path tableDir) throws IOException { 119 archiveRegion(conf, fs, rootDir, tableDir, FSUtils.getRegionDirFromRootDir(rootDir, info)); 120 } 121 122 /** 123 * Remove an entire region from the table directory via archiving the region's hfiles. 124 * @param fs {@link FileSystem} from which to remove the region 125 * @param rootdir {@link Path} to the root directory where hbase files are stored (for building 126 * the archive path) 127 * @param tableDir {@link Path} to where the table is being stored (for building the archive 128 * path) 129 * @param regionDir {@link Path} to where a region is being stored (for building the archive path) 130 * @return <tt>true</tt> if the region was successfully deleted. <tt>false</tt> if the filesystem 131 * operations could not complete. 132 * @throws IOException if the request cannot be completed 133 */ 134 public static boolean archiveRegion(Configuration conf, FileSystem fs, Path rootdir, 135 Path tableDir, Path regionDir) throws IOException { 136 // otherwise, we archive the files 137 // make sure we can archive 138 if (tableDir == null || regionDir == null) { 139 LOG.error("No archive directory could be found because tabledir (" + tableDir 140 + ") or regiondir (" + regionDir + "was null. Deleting files instead."); 141 if (regionDir != null) { 142 deleteRegionWithoutArchiving(fs, regionDir); 143 } 144 // we should have archived, but failed to. Doesn't matter if we deleted 145 // the archived files correctly or not. 146 return false; 147 } 148 149 LOG.debug("ARCHIVING {}", regionDir); 150 151 // make sure the regiondir lives under the tabledir 152 Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString())); 153 Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir, 154 CommonFSUtils.getTableName(tableDir), regionDir.getName()); 155 156 FileStatusConverter getAsFile = new FileStatusConverter(fs); 157 // otherwise, we attempt to archive the store files 158 159 // build collection of just the store directories to archive 160 Collection<File> toArchive = new ArrayList<>(); 161 final PathFilter dirFilter = new FSUtils.DirFilter(fs); 162 PathFilter nonHidden = new PathFilter() { 163 @Override 164 public boolean accept(Path file) { 165 return dirFilter.accept(file) && !file.getName().startsWith("."); 166 } 167 }; 168 FileStatus[] storeDirs = CommonFSUtils.listStatus(fs, regionDir, nonHidden); 169 // if there no files, we can just delete the directory and return; 170 if (storeDirs == null) { 171 LOG.debug("Directory {} empty.", regionDir); 172 return deleteRegionWithoutArchiving(fs, regionDir); 173 } 174 175 // convert the files in the region to a File 176 Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add); 177 LOG.debug("Archiving " + toArchive); 178 List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive, 179 EnvironmentEdgeManager.currentTime()); 180 if (!failedArchive.isEmpty()) { 181 throw new FailedArchiveException( 182 "Failed to archive/delete all the files for region:" + regionDir.getName() + " into " 183 + regionArchiveDir + ". Something is probably awry on the filesystem.", 184 failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); 185 } 186 // if that was successful, then we delete the region 187 return deleteRegionWithoutArchiving(fs, regionDir); 188 } 189 190 /** 191 * Archive the specified regions in parallel. 192 * @param conf the configuration to use 193 * @param fs {@link FileSystem} from which to remove the region 194 * @param rootDir {@link Path} to the root directory where hbase files are stored (for 195 * building the archive path) 196 * @param tableDir {@link Path} to where the table is being stored (for building the archive 197 * path) 198 * @param regionDirList {@link Path} to where regions are being stored (for building the archive 199 * path) 200 * @throws IOException if the request cannot be completed 201 */ 202 public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDir, Path tableDir, 203 List<Path> regionDirList) throws IOException { 204 List<Future<Void>> futures = new ArrayList<>(regionDirList.size()); 205 for (Path regionDir : regionDirList) { 206 Future<Void> future = getArchiveExecutor(conf).submit(() -> { 207 archiveRegion(conf, fs, rootDir, tableDir, regionDir); 208 return null; 209 }); 210 futures.add(future); 211 } 212 try { 213 for (Future<Void> future : futures) { 214 future.get(); 215 } 216 } catch (InterruptedException e) { 217 throw new InterruptedIOException(e.getMessage()); 218 } catch (ExecutionException e) { 219 throw new IOException(e.getCause()); 220 } 221 } 222 223 private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configuration conf) { 224 if (archiveExecutor == null) { 225 int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8); 226 archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, 227 getThreadFactory("HFileArchiver")); 228 229 // Shutdown this ThreadPool in a shutdown hook 230 Runtime.getRuntime().addShutdownHook(new Thread(() -> archiveExecutor.shutdown())); 231 } 232 return archiveExecutor; 233 } 234 235 // We need this method instead of Threads.getNamedThreadFactory() to pass some tests. 236 // The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for 237 // new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related 238 // issues in some tests. 239 private static ThreadFactory getThreadFactory(String archiverName) { 240 return new ThreadFactory() { 241 final AtomicInteger threadNumber = new AtomicInteger(1); 242 243 @Override 244 public Thread newThread(Runnable r) { 245 final String name = archiverName + "-" + threadNumber.getAndIncrement(); 246 Thread t = new Thread(r, name); 247 t.setDaemon(true); 248 return t; 249 } 250 }; 251 } 252 253 /** 254 * Remove from the specified region the store files of the specified column family, either by 255 * archiving them or outright deletion 256 * @param fs the filesystem where the store files live 257 * @param conf {@link Configuration} to examine to determine the archive directory 258 * @param parent Parent region hosting the store files 259 * @param tableDir {@link Path} to where the table is being stored (for building the archive path) 260 * @param family the family hosting the store files 261 * @throws IOException if the files could not be correctly disposed. 262 */ 263 public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo parent, 264 Path tableDir, byte[] family) throws IOException { 265 Path familyDir = new Path(tableDir, new Path(parent.getEncodedName(), Bytes.toString(family))); 266 archiveFamilyByFamilyDir(fs, conf, parent, familyDir, family); 267 } 268 269 /** 270 * Removes from the specified region the store files of the specified column family, either by 271 * archiving them or outright deletion 272 * @param fs the filesystem where the store files live 273 * @param conf {@link Configuration} to examine to determine the archive directory 274 * @param parent Parent region hosting the store files 275 * @param familyDir {@link Path} to where the family is being stored 276 * @param family the family hosting the store files 277 * @throws IOException if the files could not be correctly disposed. 278 */ 279 public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent, 280 Path familyDir, byte[] family) throws IOException { 281 FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir); 282 if (storeFiles == null) { 283 LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(), 284 Bytes.toString(family)); 285 return; 286 } 287 288 FileStatusConverter getAsFile = new FileStatusConverter(fs); 289 Collection<File> toArchive = Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList()); 290 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family); 291 292 // do the actual archive 293 List<File> failedArchive = 294 resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); 295 if (!failedArchive.isEmpty()) { 296 throw new FailedArchiveException( 297 "Failed to archive/delete all the files for region:" 298 + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family) + " into " 299 + storeArchiveDir + ". Something is probably awry on the filesystem.", 300 failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); 301 } 302 } 303 304 /** 305 * Remove the store files, either by archiving them or outright deletion 306 * @param conf {@link Configuration} to examine to determine the archive directory 307 * @param fs the filesystem where the store files live 308 * @param regionInfo {@link RegionInfo} of the region hosting the store files 309 * @param family the family hosting the store files 310 * @param compactedFiles files to be disposed of. No further reading of these files should be 311 * attempted; otherwise likely to cause an {@link IOException} 312 * @throws IOException if the files could not be correctly disposed. 313 */ 314 public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo, 315 Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) throws IOException { 316 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); 317 archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir); 318 } 319 320 /** 321 * Archive recovered edits using existing logic for archiving store files. This is currently only 322 * relevant when <b>hbase.region.archive.recovered.edits</b> is true, as recovered edits shouldn't 323 * be kept after replay. In theory, we could use very same method available for archiving store 324 * files, but supporting WAL dir and store files on different FileSystems added the need for extra 325 * validation of the passed FileSystem instance and the path where the archiving edits should be 326 * placed. 327 * @param conf {@link Configuration} to determine the archive directory. 328 * @param fs the filesystem used for storing WAL files. 329 * @param regionInfo {@link RegionInfo} a pseudo region representation for the archiving logic. 330 * @param family a pseudo familiy representation for the archiving logic. 331 * @param replayedEdits the recovered edits to be archived. 332 * @throws IOException if files can't be achived due to some internal error. 333 */ 334 public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, RegionInfo regionInfo, 335 byte[] family, Collection<HStoreFile> replayedEdits) throws IOException { 336 String workingDir = conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR)); 337 // extra sanity checks for the right FS 338 Path path = new Path(workingDir); 339 if (path.isAbsoluteAndSchemeAuthorityNull()) { 340 // no schema specified on wal dir value, so it's on same FS as StoreFiles 341 path = new Path(conf.get(HConstants.HBASE_DIR)); 342 } 343 if (path.toUri().getScheme() != null && !path.toUri().getScheme().equals(fs.getScheme())) { 344 throw new IOException( 345 "Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme()); 346 } 347 path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family); 348 archive(conf, fs, regionInfo, family, replayedEdits, path); 349 } 350 351 private static void archive(Configuration conf, FileSystem fs, RegionInfo regionInfo, 352 byte[] family, Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException { 353 // sometimes in testing, we don't have rss, so we need to check for that 354 if (fs == null) { 355 LOG.warn( 356 "Passed filesystem is null, so just deleting files without archiving for {}," + "family={}", 357 Bytes.toString(regionInfo.getRegionName()), Bytes.toString(family)); 358 deleteStoreFilesWithoutArchiving(compactedFiles); 359 return; 360 } 361 362 // short circuit if we don't have any files to delete 363 if (compactedFiles.isEmpty()) { 364 LOG.debug("No files to dispose of, done!"); 365 return; 366 } 367 368 // build the archive path 369 if (regionInfo == null || family == null) 370 throw new IOException("Need to have a region and a family to archive from."); 371 // make sure we don't archive if we can't and that the archive dir exists 372 if (!fs.mkdirs(storeArchiveDir)) { 373 throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:" 374 + Bytes.toString(family) + ", deleting compacted files instead."); 375 } 376 377 // otherwise we attempt to archive the store files 378 LOG.debug("Archiving compacted files."); 379 380 // Wrap the storefile into a File 381 StoreToFile getStorePath = new StoreToFile(fs); 382 Collection<File> storeFiles = 383 compactedFiles.stream().map(getStorePath).collect(Collectors.toList()); 384 385 // do the actual archive 386 List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles, 387 EnvironmentEdgeManager.currentTime()); 388 389 if (!failedArchive.isEmpty()) { 390 throw new FailedArchiveException( 391 "Failed to archive/delete all the files for region:" 392 + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family) 393 + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.", 394 failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); 395 } 396 } 397 398 /** 399 * Archive the store file 400 * @param fs the filesystem where the store files live 401 * @param regionInfo region hosting the store files 402 * @param conf {@link Configuration} to examine to determine the archive directory 403 * @param tableDir {@link Path} to where the table is being stored (for building the archive 404 * path) 405 * @param family the family hosting the store files 406 * @param storeFile file to be archived 407 * @throws IOException if the files could not be correctly disposed. 408 */ 409 public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInfo regionInfo, 410 Path tableDir, byte[] family, Path storeFile) throws IOException { 411 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); 412 // make sure we don't archive if we can't and that the archive dir exists 413 if (!fs.mkdirs(storeArchiveDir)) { 414 throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:" 415 + Bytes.toString(family) + ", deleting compacted files instead."); 416 } 417 418 // do the actual archive 419 long start = EnvironmentEdgeManager.currentTime(); 420 File file = new FileablePath(fs, storeFile); 421 if (!resolveAndArchiveFile(storeArchiveDir, file, Long.toString(start))) { 422 throw new IOException("Failed to archive/delete the file for region:" 423 + regionInfo.getRegionNameAsString() + ", family:" + Bytes.toString(family) + " into " 424 + storeArchiveDir + ". Something is probably awry on the filesystem."); 425 } 426 } 427 428 /** 429 * Resolve any conflict with an existing archive file via timestamp-append renaming of the 430 * existing file and then archive the passed in files. 431 * @param fs {@link FileSystem} on which to archive the files 432 * @param baseArchiveDir base archive directory to store the files. If any of the files to archive 433 * are directories, will append the name of the directory to the base 434 * archive directory name, creating a parallel structure. 435 * @param toArchive files/directories that need to be archvied 436 * @param start time the archiving started - used for resolving archive conflicts. 437 * @return the list of failed to archive files. 438 * @throws IOException if an unexpected file operation exception occurred 439 */ 440 private static List<File> resolveAndArchive(Configuration conf, FileSystem fs, 441 Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException { 442 // Early exit if no files to archive 443 if (toArchive.isEmpty()) { 444 LOG.trace("No files to archive, returning an empty list."); 445 return Collections.emptyList(); 446 } 447 448 LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir); 449 450 // Ensure the archive directory exists 451 ensureArchiveDirectoryExists(fs, baseArchiveDir); 452 453 // Thread-safe collection for storing failures 454 Queue<File> failures = new ConcurrentLinkedQueue<>(); 455 String startTime = Long.toString(start); 456 457 // Separate files and directories for processing 458 List<File> filesOnly = new ArrayList<>(); 459 for (File file : toArchive) { 460 if (file.isFile()) { 461 filesOnly.add(file); 462 } else { 463 handleDirectory(conf, fs, baseArchiveDir, failures, file, start); 464 } 465 } 466 467 // Archive files concurrently 468 archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, startTime); 469 470 return new ArrayList<>(failures); // Convert to a List for the return value 471 } 472 473 private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir) 474 throws IOException { 475 if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) { 476 throw new IOException("Failed to create the archive directory: " + baseArchiveDir); 477 } 478 LOG.trace("Archive directory ready: {}", baseArchiveDir); 479 } 480 481 private static void handleDirectory(Configuration conf, FileSystem fs, Path baseArchiveDir, 482 Queue<File> failures, File directory, long start) { 483 LOG.trace("Processing directory: {}, archiving its children.", directory); 484 Path subArchiveDir = new Path(baseArchiveDir, directory.getName()); 485 486 try { 487 Collection<File> children = directory.getChildren(); 488 failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children, start)); 489 } catch (IOException e) { 490 LOG.warn("Failed to archive directory: {}", directory, e); 491 failures.add(directory); 492 } 493 } 494 495 private static void archiveFilesConcurrently(Configuration conf, Path baseArchiveDir, 496 List<File> files, Queue<File> failures, String startTime) { 497 LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir); 498 Map<File, Future<Boolean>> futureMap = new HashMap<>(); 499 // Submit file archiving tasks 500 // default is 16 which comes equal hbase.hstore.blockingStoreFiles default value 501 int maxThreads = conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16); 502 ThreadPoolExecutor hfilesArchiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, 503 TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-")); 504 try { 505 for (File file : files) { 506 Future<Boolean> future = hfilesArchiveExecutor 507 .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); 508 futureMap.put(file, future); 509 } 510 511 // Process results of each task 512 for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) { 513 File file = entry.getKey(); 514 try { 515 if (!entry.getValue().get()) { 516 LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir); 517 failures.add(file); 518 } 519 } catch (InterruptedException e) { 520 LOG.error("Archiving interrupted for file: {}", file, e); 521 Thread.currentThread().interrupt(); // Restore interrupt status 522 failures.add(file); 523 } catch (ExecutionException e) { 524 LOG.error("Archiving failed for file: {}", file, e); 525 failures.add(file); 526 } 527 } 528 } finally { 529 hfilesArchiveExecutor.shutdown(); 530 } 531 } 532 533 /** 534 * Attempt to archive the passed in file to the archive directory. 535 * <p> 536 * If the same file already exists in the archive, it is moved to a timestamped directory under 537 * the archive directory and the new file is put in its place. 538 * @param archiveDir {@link Path} to the directory that stores the archives of the hfiles 539 * @param currentFile {@link Path} to the original HFile that will be archived 540 * @param archiveStartTime time the archiving started, to resolve naming conflicts 541 * @return <tt>true</tt> if the file is successfully archived. <tt>false</tt> if there was a 542 * problem, but the operation still completed. 543 * @throws IOException on failure to complete {@link FileSystem} operations. 544 */ 545 private static boolean resolveAndArchiveFile(Path archiveDir, File currentFile, 546 String archiveStartTime) throws IOException { 547 // build path as it should be in the archive 548 String filename = currentFile.getName(); 549 Path archiveFile = new Path(archiveDir, filename); 550 FileSystem fs = currentFile.getFileSystem(); 551 552 // An existing destination file in the archive is unexpected, but we handle it here. 553 if (fs.exists(archiveFile)) { 554 if (!fs.exists(currentFile.getPath())) { 555 // If the file already exists in the archive, and there is no current file to archive, then 556 // assume that the file in archive is correct. This is an unexpected situation, suggesting a 557 // race condition or split brain. 558 // In HBASE-26718 this was found when compaction incorrectly happened during warmupRegion. 559 LOG.warn("{} exists in archive. Attempted to archive nonexistent file {}.", archiveFile, 560 currentFile); 561 // We return success to match existing behavior in this method, where FileNotFoundException 562 // in moveAndClose is ignored. 563 return true; 564 } 565 // There is a conflict between the current file and the already existing archived file. 566 // Move the archived file to a timestamped backup. This is a really, really unlikely 567 // situation, where we get the same name for the existing file, but is included just for that 568 // 1 in trillion chance. We are potentially incurring data loss in the archive directory if 569 // the files are not identical. The timestamped backup will be cleaned by HFileCleaner as it 570 // has no references. 571 FileStatus curStatus = fs.getFileStatus(currentFile.getPath()); 572 FileStatus archiveStatus = fs.getFileStatus(archiveFile); 573 long curLen = curStatus.getLen(); 574 long archiveLen = archiveStatus.getLen(); 575 long curMtime = curStatus.getModificationTime(); 576 long archiveMtime = archiveStatus.getModificationTime(); 577 if (curLen != archiveLen) { 578 LOG.error( 579 "{} already exists in archive with different size than current {}." 580 + " archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}", 581 archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime); 582 throw new IOException( 583 archiveFile + " already exists in archive with different size" + " than " + currentFile); 584 } 585 586 LOG.error( 587 "{} already exists in archive, moving to timestamped backup and overwriting" 588 + " current {}. archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}", 589 archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime); 590 591 // move the archive file to the stamped backup 592 Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime); 593 if (!fs.rename(archiveFile, backedupArchiveFile)) { 594 LOG.error("Could not rename archive file to backup: " + backedupArchiveFile 595 + ", deleting existing file in favor of newer."); 596 // try to delete the existing file, if we can't rename it 597 if (!fs.delete(archiveFile, false)) { 598 throw new IOException("Couldn't delete existing archive file (" + archiveFile 599 + ") or rename it to the backup file (" + backedupArchiveFile 600 + ") to make room for similarly named file."); 601 } 602 } else { 603 LOG.info("Backed up archive file from {} to {}.", archiveFile, backedupArchiveFile); 604 } 605 } 606 607 LOG.trace("No existing file in archive for {}, free to archive original file.", archiveFile); 608 609 // at this point, we should have a free spot for the archive file 610 boolean success = false; 611 for (int i = 0; !success && i < DEFAULT_RETRIES_NUMBER; ++i) { 612 if (i > 0) { 613 // Ensure that the archive directory exists. 614 // The previous "move to archive" operation has failed probably because 615 // the cleaner has removed our archive directory (HBASE-7643). 616 // (we're in a retry loop, so don't worry too much about the exception) 617 try { 618 if (!fs.exists(archiveDir)) { 619 if (fs.mkdirs(archiveDir)) { 620 LOG.debug("Created archive directory {}", archiveDir); 621 } 622 } 623 } catch (IOException e) { 624 LOG.warn("Failed to create directory {}", archiveDir, e); 625 } 626 } 627 628 try { 629 success = currentFile.moveAndClose(archiveFile); 630 } catch (FileNotFoundException fnfe) { 631 LOG.warn("Failed to archive " + currentFile 632 + " because it does not exist! Skipping and continuing on.", fnfe); 633 success = true; 634 } catch (IOException e) { 635 success = false; 636 // When HFiles are placed on a filesystem other than HDFS a rename operation can be a 637 // non-atomic file copy operation. It can take a long time to copy a large hfile and if 638 // interrupted there may be a partially copied file present at the destination. We must 639 // remove the partially copied file, if any, or otherwise the archive operation will fail 640 // indefinitely from this point. 641 LOG.warn("Failed to archive " + currentFile + " on try #" + i, e); 642 try { 643 fs.delete(archiveFile, false); 644 } catch (FileNotFoundException fnfe) { 645 // This case is fine. 646 } catch (IOException ee) { 647 // Complain about other IO exceptions 648 LOG.warn("Failed to clean up from failure to archive " + currentFile + " on try #" + i, 649 ee); 650 } 651 } 652 } 653 654 if (!success) { 655 LOG.error("Failed to archive " + currentFile); 656 return false; 657 } 658 659 LOG.debug("Archived from {} to {}", currentFile, archiveFile); 660 return true; 661 } 662 663 /** 664 * Without regard for backup, delete a region. Should be used with caution. 665 * @param regionDir {@link Path} to the region to be deleted. 666 * @param fs FileSystem from which to delete the region 667 * @return <tt>true</tt> on successful deletion, <tt>false</tt> otherwise 668 * @throws IOException on filesystem operation failure 669 */ 670 private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir) 671 throws IOException { 672 if (fs.delete(regionDir, true)) { 673 LOG.debug("Deleted {}", regionDir); 674 return true; 675 } 676 LOG.debug("Failed to delete directory {}", regionDir); 677 return false; 678 } 679 680 /** 681 * Just do a simple delete of the given store files 682 * <p> 683 * A best effort is made to delete each of the files, rather than bailing on the first failure. 684 * <p> 685 * @param compactedFiles store files to delete from the file system. 686 * @throws IOException if a file cannot be deleted. All files will be attempted to deleted before 687 * throwing the exception, rather than failing at the first file. 688 */ 689 private static void deleteStoreFilesWithoutArchiving(Collection<HStoreFile> compactedFiles) 690 throws IOException { 691 LOG.debug("Deleting files without archiving."); 692 List<IOException> errors = new ArrayList<>(0); 693 for (HStoreFile hsf : compactedFiles) { 694 try { 695 hsf.deleteStoreFile(); 696 } catch (IOException e) { 697 LOG.error("Failed to delete {}", hsf.getPath()); 698 errors.add(e); 699 } 700 } 701 if (errors.size() > 0) { 702 throw MultipleIOException.createIOException(errors); 703 } 704 } 705 706 /** 707 * Adapt a type to match the {@link File} interface, which is used internally for handling 708 * archival/removal of files 709 * @param <T> type to adapt to the {@link File} interface 710 */ 711 private static abstract class FileConverter<T> implements Function<T, File> { 712 protected final FileSystem fs; 713 714 public FileConverter(FileSystem fs) { 715 this.fs = fs; 716 } 717 } 718 719 /** 720 * Convert a FileStatus to something we can manage in the archiving 721 */ 722 private static class FileStatusConverter extends FileConverter<FileStatus> { 723 public FileStatusConverter(FileSystem fs) { 724 super(fs); 725 } 726 727 @Override 728 public File apply(FileStatus input) { 729 return new FileablePath(fs, input.getPath()); 730 } 731 } 732 733 /** 734 * Convert the {@link HStoreFile} into something we can manage in the archive methods 735 */ 736 private static class StoreToFile extends FileConverter<HStoreFile> { 737 public StoreToFile(FileSystem fs) { 738 super(fs); 739 } 740 741 @Override 742 public File apply(HStoreFile input) { 743 return new FileableStoreFile(fs, input); 744 } 745 } 746 747 /** 748 * Wrapper to handle file operations uniformly 749 */ 750 private static abstract class File { 751 protected final FileSystem fs; 752 753 public File(FileSystem fs) { 754 this.fs = fs; 755 } 756 757 /** 758 * Delete the file 759 * @throws IOException on failure 760 */ 761 abstract void delete() throws IOException; 762 763 /** 764 * Check to see if this is a file or a directory 765 * @return <tt>true</tt> if it is a file, <tt>false</tt> otherwise 766 * @throws IOException on {@link FileSystem} connection error 767 */ 768 abstract boolean isFile() throws IOException; 769 770 /** 771 * @return if this is a directory, returns all the children in the directory, otherwise returns 772 * an empty list 773 */ 774 abstract Collection<File> getChildren() throws IOException; 775 776 /** 777 * close any outside readers of the file 778 */ 779 abstract void close() throws IOException; 780 781 /** Returns the name of the file (not the full fs path, just the individual file name) */ 782 abstract String getName(); 783 784 /** Returns the path to this file */ 785 abstract Path getPath(); 786 787 /** 788 * Move the file to the given destination 789 * @return <tt>true</tt> on success 790 */ 791 public boolean moveAndClose(Path dest) throws IOException { 792 this.close(); 793 Path p = this.getPath(); 794 return CommonFSUtils.renameAndSetModifyTime(fs, p, dest); 795 } 796 797 /** Returns the {@link FileSystem} on which this file resides */ 798 public FileSystem getFileSystem() { 799 return this.fs; 800 } 801 802 @Override 803 public String toString() { 804 return this.getClass().getSimpleName() + ", " + getPath().toString(); 805 } 806 } 807 808 /** 809 * A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}. 810 */ 811 private static class FileablePath extends File { 812 private final Path file; 813 private final FileStatusConverter getAsFile; 814 815 public FileablePath(FileSystem fs, Path file) { 816 super(fs); 817 this.file = file; 818 this.getAsFile = new FileStatusConverter(fs); 819 } 820 821 @Override 822 public void delete() throws IOException { 823 if (!fs.delete(file, true)) throw new IOException("Failed to delete:" + this.file); 824 } 825 826 @Override 827 public String getName() { 828 return file.getName(); 829 } 830 831 @Override 832 public Collection<File> getChildren() throws IOException { 833 if (fs.isFile(file)) { 834 return Collections.emptyList(); 835 } 836 return Stream.of(fs.listStatus(file)).map(getAsFile).collect(Collectors.toList()); 837 } 838 839 @Override 840 public boolean isFile() throws IOException { 841 return fs.isFile(file); 842 } 843 844 @Override 845 public void close() throws IOException { 846 // NOOP - files are implicitly closed on removal 847 } 848 849 @Override 850 Path getPath() { 851 return file; 852 } 853 } 854 855 /** 856 * {@link File} adapter for a {@link HStoreFile} living on a {@link FileSystem} . 857 */ 858 private static class FileableStoreFile extends File { 859 HStoreFile file; 860 861 public FileableStoreFile(FileSystem fs, HStoreFile store) { 862 super(fs); 863 this.file = store; 864 } 865 866 @Override 867 public void delete() throws IOException { 868 file.deleteStoreFile(); 869 } 870 871 @Override 872 public String getName() { 873 return file.getPath().getName(); 874 } 875 876 @Override 877 public boolean isFile() { 878 return true; 879 } 880 881 @Override 882 public Collection<File> getChildren() throws IOException { 883 // storefiles don't have children 884 return Collections.emptyList(); 885 } 886 887 @Override 888 public void close() throws IOException { 889 file.closeStoreFile(true); 890 } 891 892 @Override 893 Path getPath() { 894 return file.getPath(); 895 } 896 } 897}