001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; 021 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import org.apache.commons.io.FilenameUtils; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.LocatedFileStatus; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.RemoteIterator; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.backup.BackupCopyJob; 038import org.apache.hadoop.hbase.backup.BackupInfo; 039import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; 040import org.apache.hadoop.hbase.backup.BackupRequest; 041import org.apache.hadoop.hbase.backup.BackupRestoreFactory; 042import org.apache.hadoop.hbase.backup.BackupType; 043import org.apache.hadoop.hbase.backup.HBackupFileSystem; 044import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; 045import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; 046import org.apache.hadoop.hbase.backup.util.BackupUtils; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.io.hfile.HFile; 051import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; 052import org.apache.hadoop.hbase.mapreduce.WALPlayer; 053import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 054import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 055import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator; 056import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.HFileArchiveUtil; 060import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 061import org.apache.hadoop.util.Tool; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 067 068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 070 071/** 072 * Incremental backup implementation. See the {@link #execute() execute} method. 073 */ 074@InterfaceAudience.Private 075public class IncrementalTableBackupClient extends TableBackupClient { 076 private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); 077 078 protected IncrementalTableBackupClient() { 079 } 080 081 public IncrementalTableBackupClient(final Connection conn, final String backupId, 082 BackupRequest request) throws IOException { 083 super(conn, backupId, request); 084 } 085 086 protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { 087 List<String> list = new ArrayList<>(); 088 for (String file : incrBackupFileList) { 089 Path p = new Path(file); 090 if (fs.exists(p) || isActiveWalPath(p)) { 091 list.add(file); 092 } else { 093 LOG.warn("Can't find file: " + file); 094 } 095 } 096 return list; 097 } 098 099 /** 100 * Check if a given path is belongs to active WAL directory 101 * @param p path 102 * @return true, if yes 103 */ 104 protected boolean isActiveWalPath(Path p) { 105 return !AbstractFSWALProvider.isArchivedLogFile(p); 106 } 107 108 protected static int getIndex(TableName tbl, List<TableName> sTableList) { 109 if (sTableList == null) { 110 return 0; 111 } 112 113 for (int i = 0; i < sTableList.size(); i++) { 114 if (tbl.equals(sTableList.get(i))) { 115 return i; 116 } 117 } 118 return -1; 119 } 120 121 /** 122 * Reads bulk load records from backup table, iterates through the records and forms the paths for 123 * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT 124 * clean up the entries in the bulk load system table. Those entries should not be cleaned until 125 * the backup is marked as complete. 126 * @param tablesToBackup list of tables to be backed up 127 */ 128 protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException { 129 Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>(); 130 List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup); 131 FileSystem tgtFs; 132 try { 133 tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); 134 } catch (URISyntaxException use) { 135 throw new IOException("Unable to get FileSystem", use); 136 } 137 Path rootdir = CommonFSUtils.getRootDir(conf); 138 Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); 139 140 for (BulkLoad bulkLoad : bulkLoads) { 141 TableName srcTable = bulkLoad.getTableName(); 142 MergeSplitBulkloadInfo bulkloadInfo = 143 toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); 144 String regionName = bulkLoad.getRegion(); 145 String fam = bulkLoad.getColumnFamily(); 146 String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); 147 148 if (!tablesToBackup.contains(srcTable)) { 149 LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); 150 continue; 151 } 152 Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); 153 Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); 154 155 String srcTableQualifier = srcTable.getQualifierAsString(); 156 String srcTableNs = srcTable.getNamespaceAsString(); 157 Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier 158 + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); 159 if (!tgtFs.mkdirs(tgtFam)) { 160 throw new IOException("couldn't create " + tgtFam); 161 } 162 Path tgt = new Path(tgtFam, filename); 163 164 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); 165 Path archive = new Path(archiveDir, filename); 166 167 if (fs.exists(p)) { 168 if (LOG.isTraceEnabled()) { 169 LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), 170 srcTableQualifier); 171 LOG.trace("copying {} to {}", p, tgt); 172 } 173 bulkloadInfo.addActiveFile(p.toString()); 174 } else if (fs.exists(archive)) { 175 LOG.debug("copying archive {} to {}", archive, tgt); 176 bulkloadInfo.addArchiveFiles(archive.toString()); 177 } 178 } 179 180 for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { 181 mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), 182 bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); 183 } 184 185 return bulkLoads; 186 } 187 188 private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles, 189 List<String> archiveFiles, TableName tn, FileSystem tgtFs) throws IOException { 190 int attempt = 1; 191 192 while (!activeFiles.isEmpty()) { 193 LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", activeFiles.size(), attempt++); 194 // Active file can be archived during copy operation, 195 // we need to handle this properly 196 try { 197 mergeSplitAndCopyBulkloadedHFiles(activeFiles, tn, tgtFs); 198 break; 199 } catch (IOException e) { 200 int numActiveFiles = activeFiles.size(); 201 updateFileLists(activeFiles, archiveFiles); 202 if (activeFiles.size() < numActiveFiles) { 203 // We've archived some files, delete bulkloads directory 204 // and re-try 205 deleteBulkLoadDirectory(); 206 continue; 207 } 208 209 throw e; 210 } 211 } 212 213 if (!archiveFiles.isEmpty()) { 214 mergeSplitAndCopyBulkloadedHFiles(archiveFiles, tn, tgtFs); 215 } 216 } 217 218 private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn, FileSystem tgtFs) 219 throws IOException { 220 MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob(); 221 conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY, 222 getBulkOutputDirForTable(tn).toString()); 223 player.setConf(conf); 224 225 String inputDirs = StringUtils.join(files, ","); 226 String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() }; 227 228 int result; 229 230 try { 231 result = player.run(args); 232 } catch (Exception e) { 233 LOG.error("Failed to run MapReduceHFileSplitterJob", e); 234 // Delete the bulkload directory if we fail to run the HFile splitter job for any reason 235 // as it might be re-tried 236 deleteBulkLoadDirectory(); 237 throw new IOException(e); 238 } 239 240 if (result != 0) { 241 throw new IOException( 242 "Failed to run MapReduceHFileSplitterJob with invalid result: " + result); 243 } 244 245 incrementalCopyBulkloadHFiles(tgtFs, tn); 246 } 247 248 public void updateFileLists(List<String> activeFiles, List<String> archiveFiles) 249 throws IOException { 250 List<String> newlyArchived = new ArrayList<>(); 251 252 for (String spath : activeFiles) { 253 if (!fs.exists(new Path(spath))) { 254 newlyArchived.add(spath); 255 } 256 } 257 258 if (!newlyArchived.isEmpty()) { 259 String rootDir = CommonFSUtils.getRootDir(conf).toString(); 260 261 activeFiles.removeAll(newlyArchived); 262 for (String file : newlyArchived) { 263 String archivedFile = file.substring(rootDir.length() + 1); 264 Path archivedFilePath = new Path(HFileArchiveUtil.getArchivePath(conf), archivedFile); 265 archivedFile = archivedFilePath.toString(); 266 267 if (!fs.exists(archivedFilePath)) { 268 throw new IOException(String.format( 269 "File %s no longer exists, and no archived file %s exists for it", file, archivedFile)); 270 } 271 272 LOG.debug("Archived file {} has been updated", archivedFile); 273 archiveFiles.add(archivedFile); 274 } 275 } 276 277 LOG.debug(newlyArchived.size() + " files have been archived."); 278 } 279 280 /** 281 * @throws IOException If the execution of the backup fails 282 * @throws ColumnFamilyMismatchException If the column families of the current table do not match 283 * the column families for the last full backup. In which 284 * case, a full backup should be taken 285 */ 286 @Override 287 public void execute() throws IOException, ColumnFamilyMismatchException { 288 try { 289 Map<TableName, String> tablesToFullBackupIds = getFullBackupIds(); 290 verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds); 291 292 // case PREPARE_INCREMENTAL: 293 beginBackup(backupManager, backupInfo); 294 backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); 295 LOG.debug("For incremental backup, current table set is " 296 + backupManager.getIncrementalBackupTableSet()); 297 newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); 298 } catch (Exception e) { 299 // fail the overall backup and return 300 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 301 BackupType.INCREMENTAL, conf); 302 throw new IOException(e); 303 } 304 305 // case INCREMENTAL_COPY: 306 try { 307 // copy out the table and region info files for each table 308 BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); 309 setupRegionLocator(); 310 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT 311 convertWALsToHFiles(); 312 incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, 313 backupInfo.getBackupRootDir()); 314 } catch (Exception e) { 315 String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; 316 // fail the overall backup and return 317 failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); 318 throw new IOException(e); 319 } 320 // case INCR_BACKUP_COMPLETE: 321 // set overall backup status: complete. Here we make sure to complete the backup. 322 // After this checkpoint, even if entering cancel process, will let the backup finished 323 try { 324 // Set the previousTimestampMap which is before this current log roll to the manifest. 325 Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap(); 326 backupInfo.setIncrTimestampMap(previousTimestampMap); 327 328 // The table list in backupInfo is good for both full backup and incremental backup. 329 // For incremental backup, it contains the incremental backup table set. 330 backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); 331 332 Map<TableName, Map<String, Long>> newTableSetTimestampMap = 333 backupManager.readLogTimestampMap(); 334 335 List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames()); 336 337 // backup complete 338 backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); 339 completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); 340 341 List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey); 342 backupManager.deleteBulkLoadedRows(bulkLoadedRows); 343 } catch (IOException e) { 344 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 345 BackupType.INCREMENTAL, conf); 346 throw new IOException(e); 347 } 348 } 349 350 protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { 351 boolean diskBasedSortingOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); 352 try { 353 LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest); 354 // set overall backup phase: incremental_copy 355 backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); 356 // get incremental backup file list and prepare parms for DistCp 357 String[] strArr = new String[files.length + 1]; 358 System.arraycopy(files, 0, strArr, 0, files.length); 359 strArr[strArr.length - 1] = backupDest; 360 361 String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); 362 if (LOG.isDebugEnabled()) { 363 LOG.debug("Setting incremental copy HFiles job name to : " + jobname); 364 } 365 conf.set(JOB_NAME_CONF_KEY, jobname); 366 conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); 367 368 BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); 369 int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); 370 if (res != 0) { 371 LOG.error("Copy incremental HFile files failed with return code: " + res + "."); 372 throw new IOException( 373 "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest); 374 } 375 LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest 376 + " finished."); 377 } finally { 378 deleteBulkLoadDirectory(); 379 conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, 380 diskBasedSortingOriginalValue); 381 } 382 } 383 384 protected void deleteBulkLoadDirectory() throws IOException { 385 // delete original bulk load directory on method exit 386 Path path = getBulkOutputDir(); 387 FileSystem fs = FileSystem.get(path.toUri(), conf); 388 boolean result = fs.delete(path, true); 389 if (!result) { 390 LOG.warn("Could not delete " + path); 391 } 392 } 393 394 protected void convertWALsToHFiles() throws IOException { 395 // get incremental backup file list and prepare parameters for DistCp 396 List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); 397 // Get list of tables in incremental backup set 398 Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); 399 // filter missing files out (they have been copied by previous backups) 400 incrBackupFileList = filterMissingFiles(incrBackupFileList); 401 List<String> tableList = new ArrayList<String>(); 402 for (TableName table : tableSet) { 403 // Check if table exists 404 if (tableExists(table, conn)) { 405 tableList.add(table.getNameAsString()); 406 } else { 407 LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); 408 } 409 } 410 walToHFiles(incrBackupFileList, tableList); 411 412 } 413 414 protected boolean tableExists(TableName table, Connection conn) throws IOException { 415 try (Admin admin = conn.getAdmin()) { 416 return admin.tableExists(table); 417 } 418 } 419 420 protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException { 421 Tool player = new WALPlayer(); 422 423 // Player reads all files in arbitrary directory structure and creates 424 // a Map task for each file. We use ';' as separator 425 // because WAL file names contains ',' 426 String dirs = StringUtils.join(dirPaths, ';'); 427 String jobname = "Incremental_Backup-" + backupId; 428 429 Path bulkOutputPath = getBulkOutputDir(); 430 conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 431 conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 432 conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 433 conf.set(JOB_NAME_CONF_KEY, jobname); 434 435 boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); 436 conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); 437 String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; 438 439 try { 440 player.setConf(conf); 441 int result = player.run(playerArgs); 442 if (result != 0) { 443 throw new IOException("WAL Player failed"); 444 } 445 } catch (IOException e) { 446 throw e; 447 } catch (Exception ee) { 448 throw new IOException("Can not convert from directory " + dirs 449 + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); 450 } finally { 451 conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, 452 diskBasedSortingEnabledOriginalValue); 453 conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); 454 conf.unset(JOB_NAME_CONF_KEY); 455 } 456 } 457 458 private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException { 459 Path bulkOutDir = getBulkOutputDirForTable(tn); 460 461 if (tgtFs.exists(bulkOutDir)) { 462 conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2); 463 Path tgtPath = getTargetDirForTable(tn); 464 try { 465 RemoteIterator<LocatedFileStatus> locatedFiles = tgtFs.listFiles(bulkOutDir, true); 466 List<String> files = new ArrayList<>(); 467 while (locatedFiles.hasNext()) { 468 LocatedFileStatus file = locatedFiles.next(); 469 if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) { 470 files.add(file.getPath().toString()); 471 } 472 } 473 incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString()); 474 } finally { 475 conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); 476 } 477 } 478 } 479 480 protected Path getBulkOutputDirForTable(TableName table) { 481 Path tablePath = getBulkOutputDir(); 482 tablePath = new Path(tablePath, table.getNamespaceAsString()); 483 tablePath = new Path(tablePath, table.getQualifierAsString()); 484 return new Path(tablePath, "data"); 485 } 486 487 protected Path getBulkOutputDir() { 488 String backupId = backupInfo.getBackupId(); 489 Path path = new Path(backupInfo.getBackupRootDir()); 490 path = new Path(path, ".tmp"); 491 path = new Path(path, backupId); 492 return path; 493 } 494 495 private Path getTargetDirForTable(TableName table) { 496 Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId()); 497 path = new Path(path, table.getNamespaceAsString()); 498 path = new Path(path, table.getQualifierAsString()); 499 return path; 500 } 501 502 private void setupRegionLocator() throws IOException { 503 Map<TableName, String> fullBackupIds = getFullBackupIds(); 504 try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { 505 506 for (TableName tableName : backupInfo.getTables()) { 507 String fullBackupId = fullBackupIds.get(tableName); 508 BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId); 509 String snapshotName = fullBackupInfo.getSnapshotName(tableName); 510 Path root = HBackupFileSystem.getTableBackupPath(tableName, 511 new Path(fullBackupInfo.getBackupRootDir()), fullBackupId); 512 String manifestDir = 513 SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root).toString(); 514 SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir, tableName); 515 } 516 } 517 } 518 519 private Map<TableName, String> getFullBackupIds() throws IOException { 520 // Ancestors are stored from newest to oldest, so we can iterate backwards 521 // in order to populate our backupId map with the most recent full backup 522 // for a given table 523 List<BackupManifest.BackupImage> images = getAncestors(backupInfo); 524 Map<TableName, String> results = new HashMap<>(); 525 for (int i = images.size() - 1; i >= 0; i--) { 526 BackupManifest.BackupImage image = images.get(i); 527 if (image.getType() != BackupType.FULL) { 528 continue; 529 } 530 531 for (TableName tn : image.getTableNames()) { 532 results.put(tn, image.getBackupId()); 533 } 534 } 535 return results; 536 } 537 538 /** 539 * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full 540 * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch 541 * is detected, a full table backup should be taken, rather than an incremental one 542 */ 543 private void verifyCfCompatibility(Set<TableName> tables, 544 Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException { 545 ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder = 546 ColumnFamilyMismatchException.newBuilder(); 547 try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { 548 for (TableName tn : tables) { 549 String backupId = tablesToFullBackupId.get(tn); 550 BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId); 551 552 ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies(); 553 String snapshotName = fullBackupInfo.getSnapshotName(tn); 554 Path root = HBackupFileSystem.getTableBackupPath(tn, 555 new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId()); 556 Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root); 557 558 FileSystem fs; 559 try { 560 fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf); 561 } catch (URISyntaxException e) { 562 throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e); 563 } 564 565 SnapshotProtos.SnapshotDescription snapshotDescription = 566 SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir); 567 SnapshotManifest manifest = 568 SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription); 569 if ( 570 SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDescription.getTtl(), 571 snapshotDescription.getCreationTime(), EnvironmentEdgeManager.currentTime()) 572 ) { 573 throw new SnapshotTTLExpiredException( 574 ProtobufUtil.createSnapshotDesc(snapshotDescription)); 575 } 576 577 ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies(); 578 if (!areCfsCompatible(currentCfs, backupCfs)) { 579 exBuilder.addMismatchedTable(tn, currentCfs, backupCfs); 580 } 581 } 582 } 583 584 ColumnFamilyMismatchException ex = exBuilder.build(); 585 if (!ex.getMismatchedTables().isEmpty()) { 586 throw ex; 587 } 588 } 589 590 private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs, 591 ColumnFamilyDescriptor[] backupCfs) { 592 if (currentCfs.length != backupCfs.length) { 593 return false; 594 } 595 596 for (int i = 0; i < backupCfs.length; i++) { 597 String currentCf = currentCfs[i].getNameAsString(); 598 String backupCf = backupCfs[i].getNameAsString(); 599 600 if (!currentCf.equals(backupCf)) { 601 return false; 602 } 603 } 604 605 return true; 606 } 607}