001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
021
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import org.apache.commons.io.FilenameUtils;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.LocatedFileStatus;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.RemoteIterator;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.backup.BackupCopyJob;
038import org.apache.hadoop.hbase.backup.BackupInfo;
039import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
040import org.apache.hadoop.hbase.backup.BackupRequest;
041import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
042import org.apache.hadoop.hbase.backup.BackupType;
043import org.apache.hadoop.hbase.backup.HBackupFileSystem;
044import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
045import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
046import org.apache.hadoop.hbase.backup.util.BackupUtils;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.io.hfile.HFile;
051import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
052import org.apache.hadoop.hbase.mapreduce.WALPlayer;
053import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
054import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
055import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
056import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException;
057import org.apache.hadoop.hbase.util.CommonFSUtils;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
061import org.apache.hadoop.util.Tool;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
070
071/**
072 * Incremental backup implementation. See the {@link #execute() execute} method.
073 */
074@InterfaceAudience.Private
075public class IncrementalTableBackupClient extends TableBackupClient {
076  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class);
077
078  protected IncrementalTableBackupClient() {
079  }
080
081  public IncrementalTableBackupClient(final Connection conn, final String backupId,
082    BackupRequest request) throws IOException {
083    super(conn, backupId, request);
084  }
085
086  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
087    List<String> list = new ArrayList<>();
088    for (String file : incrBackupFileList) {
089      Path p = new Path(file);
090      if (fs.exists(p) || isActiveWalPath(p)) {
091        list.add(file);
092      } else {
093        LOG.warn("Can't find file: " + file);
094      }
095    }
096    return list;
097  }
098
099  /**
100   * Check if a given path is belongs to active WAL directory
101   * @param p path
102   * @return true, if yes
103   */
104  protected boolean isActiveWalPath(Path p) {
105    return !AbstractFSWALProvider.isArchivedLogFile(p);
106  }
107
108  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
109    if (sTableList == null) {
110      return 0;
111    }
112
113    for (int i = 0; i < sTableList.size(); i++) {
114      if (tbl.equals(sTableList.get(i))) {
115        return i;
116      }
117    }
118    return -1;
119  }
120
121  /**
122   * Reads bulk load records from backup table, iterates through the records and forms the paths for
123   * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT
124   * clean up the entries in the bulk load system table. Those entries should not be cleaned until
125   * the backup is marked as complete.
126   * @param tablesToBackup list of tables to be backed up
127   */
128  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException {
129    Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
130    List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
131    FileSystem tgtFs;
132    try {
133      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
134    } catch (URISyntaxException use) {
135      throw new IOException("Unable to get FileSystem", use);
136    }
137    Path rootdir = CommonFSUtils.getRootDir(conf);
138    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
139
140    for (BulkLoad bulkLoad : bulkLoads) {
141      TableName srcTable = bulkLoad.getTableName();
142      MergeSplitBulkloadInfo bulkloadInfo =
143        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
144      String regionName = bulkLoad.getRegion();
145      String fam = bulkLoad.getColumnFamily();
146      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
147
148      if (!tablesToBackup.contains(srcTable)) {
149        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
150        continue;
151      }
152      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
153      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename);
154
155      String srcTableQualifier = srcTable.getQualifierAsString();
156      String srcTableNs = srcTable.getNamespaceAsString();
157      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier
158        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
159      if (!tgtFs.mkdirs(tgtFam)) {
160        throw new IOException("couldn't create " + tgtFam);
161      }
162      Path tgt = new Path(tgtFam, filename);
163
164      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
165      Path archive = new Path(archiveDir, filename);
166
167      if (fs.exists(p)) {
168        if (LOG.isTraceEnabled()) {
169          LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(),
170            srcTableQualifier);
171          LOG.trace("copying {} to {}", p, tgt);
172        }
173        bulkloadInfo.addActiveFile(p.toString());
174      } else if (fs.exists(archive)) {
175        LOG.debug("copying archive {} to {}", archive, tgt);
176        bulkloadInfo.addArchiveFiles(archive.toString());
177      }
178    }
179
180    for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
181      mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
182        bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
183    }
184
185    return bulkLoads;
186  }
187
188  private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
189    List<String> archiveFiles, TableName tn, FileSystem tgtFs) throws IOException {
190    int attempt = 1;
191
192    while (!activeFiles.isEmpty()) {
193      LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", activeFiles.size(), attempt++);
194      // Active file can be archived during copy operation,
195      // we need to handle this properly
196      try {
197        mergeSplitAndCopyBulkloadedHFiles(activeFiles, tn, tgtFs);
198        break;
199      } catch (IOException e) {
200        int numActiveFiles = activeFiles.size();
201        updateFileLists(activeFiles, archiveFiles);
202        if (activeFiles.size() < numActiveFiles) {
203          // We've archived some files, delete bulkloads directory
204          // and re-try
205          deleteBulkLoadDirectory();
206          continue;
207        }
208
209        throw e;
210      }
211    }
212
213    if (!archiveFiles.isEmpty()) {
214      mergeSplitAndCopyBulkloadedHFiles(archiveFiles, tn, tgtFs);
215    }
216  }
217
218  private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn, FileSystem tgtFs)
219    throws IOException {
220    MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
221    conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
222      getBulkOutputDirForTable(tn).toString());
223    player.setConf(conf);
224
225    String inputDirs = StringUtils.join(files, ",");
226    String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() };
227
228    int result;
229
230    try {
231      result = player.run(args);
232    } catch (Exception e) {
233      LOG.error("Failed to run MapReduceHFileSplitterJob", e);
234      // Delete the bulkload directory if we fail to run the HFile splitter job for any reason
235      // as it might be re-tried
236      deleteBulkLoadDirectory();
237      throw new IOException(e);
238    }
239
240    if (result != 0) {
241      throw new IOException(
242        "Failed to run MapReduceHFileSplitterJob with invalid result: " + result);
243    }
244
245    incrementalCopyBulkloadHFiles(tgtFs, tn);
246  }
247
248  public void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
249    throws IOException {
250    List<String> newlyArchived = new ArrayList<>();
251
252    for (String spath : activeFiles) {
253      if (!fs.exists(new Path(spath))) {
254        newlyArchived.add(spath);
255      }
256    }
257
258    if (!newlyArchived.isEmpty()) {
259      String rootDir = CommonFSUtils.getRootDir(conf).toString();
260
261      activeFiles.removeAll(newlyArchived);
262      for (String file : newlyArchived) {
263        String archivedFile = file.substring(rootDir.length() + 1);
264        Path archivedFilePath = new Path(HFileArchiveUtil.getArchivePath(conf), archivedFile);
265        archivedFile = archivedFilePath.toString();
266
267        if (!fs.exists(archivedFilePath)) {
268          throw new IOException(String.format(
269            "File %s no longer exists, and no archived file %s exists for it", file, archivedFile));
270        }
271
272        LOG.debug("Archived file {} has been updated", archivedFile);
273        archiveFiles.add(archivedFile);
274      }
275    }
276
277    LOG.debug(newlyArchived.size() + " files have been archived.");
278  }
279
280  /**
281   * @throws IOException                   If the execution of the backup fails
282   * @throws ColumnFamilyMismatchException If the column families of the current table do not match
283   *                                       the column families for the last full backup. In which
284   *                                       case, a full backup should be taken
285   */
286  @Override
287  public void execute() throws IOException, ColumnFamilyMismatchException {
288    try {
289      Map<TableName, String> tablesToFullBackupIds = getFullBackupIds();
290      verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds);
291
292      // case PREPARE_INCREMENTAL:
293      beginBackup(backupManager, backupInfo);
294      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
295      LOG.debug("For incremental backup, current table set is "
296        + backupManager.getIncrementalBackupTableSet());
297      newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
298    } catch (Exception e) {
299      // fail the overall backup and return
300      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
301        BackupType.INCREMENTAL, conf);
302      throw new IOException(e);
303    }
304
305    // case INCREMENTAL_COPY:
306    try {
307      // copy out the table and region info files for each table
308      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
309      setupRegionLocator();
310      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
311      convertWALsToHFiles();
312      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
313        backupInfo.getBackupRootDir());
314    } catch (Exception e) {
315      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
316      // fail the overall backup and return
317      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
318      throw new IOException(e);
319    }
320    // case INCR_BACKUP_COMPLETE:
321    // set overall backup status: complete. Here we make sure to complete the backup.
322    // After this checkpoint, even if entering cancel process, will let the backup finished
323    try {
324      // Set the previousTimestampMap which is before this current log roll to the manifest.
325      Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
326      backupInfo.setIncrTimestampMap(previousTimestampMap);
327
328      // The table list in backupInfo is good for both full backup and incremental backup.
329      // For incremental backup, it contains the incremental backup table set.
330      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
331
332      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
333        backupManager.readLogTimestampMap();
334
335      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
336      Long newStartCode =
337        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
338      backupManager.writeBackupStartCode(newStartCode);
339
340      List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
341
342      // backup complete
343      completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
344
345      List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey);
346      backupManager.deleteBulkLoadedRows(bulkLoadedRows);
347    } catch (IOException e) {
348      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
349        BackupType.INCREMENTAL, conf);
350      throw new IOException(e);
351    }
352  }
353
354  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
355    boolean diskBasedSortingOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
356    try {
357      LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
358      // set overall backup phase: incremental_copy
359      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
360      // get incremental backup file list and prepare parms for DistCp
361      String[] strArr = new String[files.length + 1];
362      System.arraycopy(files, 0, strArr, 0, files.length);
363      strArr[strArr.length - 1] = backupDest;
364
365      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
366      if (LOG.isDebugEnabled()) {
367        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
368      }
369      conf.set(JOB_NAME_CONF_KEY, jobname);
370      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
371
372      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
373      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
374      if (res != 0) {
375        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
376        throw new IOException(
377          "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest);
378      }
379      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest
380        + " finished.");
381    } finally {
382      deleteBulkLoadDirectory();
383      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
384        diskBasedSortingOriginalValue);
385    }
386  }
387
388  protected void deleteBulkLoadDirectory() throws IOException {
389    // delete original bulk load directory on method exit
390    Path path = getBulkOutputDir();
391    FileSystem fs = FileSystem.get(path.toUri(), conf);
392    boolean result = fs.delete(path, true);
393    if (!result) {
394      LOG.warn("Could not delete " + path);
395    }
396  }
397
398  protected void convertWALsToHFiles() throws IOException {
399    // get incremental backup file list and prepare parameters for DistCp
400    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
401    // Get list of tables in incremental backup set
402    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
403    // filter missing files out (they have been copied by previous backups)
404    incrBackupFileList = filterMissingFiles(incrBackupFileList);
405    List<String> tableList = new ArrayList<String>();
406    for (TableName table : tableSet) {
407      // Check if table exists
408      if (tableExists(table, conn)) {
409        tableList.add(table.getNameAsString());
410      } else {
411        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
412      }
413    }
414    walToHFiles(incrBackupFileList, tableList);
415
416  }
417
418  protected boolean tableExists(TableName table, Connection conn) throws IOException {
419    try (Admin admin = conn.getAdmin()) {
420      return admin.tableExists(table);
421    }
422  }
423
424  protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
425    Tool player = new WALPlayer();
426
427    // Player reads all files in arbitrary directory structure and creates
428    // a Map task for each file. We use ';' as separator
429    // because WAL file names contains ','
430    String dirs = StringUtils.join(dirPaths, ';');
431    String jobname = "Incremental_Backup-" + backupId;
432
433    Path bulkOutputPath = getBulkOutputDir();
434    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
435    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
436    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
437    conf.set(JOB_NAME_CONF_KEY, jobname);
438
439    boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
440    conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
441    String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
442
443    try {
444      player.setConf(conf);
445      int result = player.run(playerArgs);
446      if (result != 0) {
447        throw new IOException("WAL Player failed");
448      }
449    } catch (IOException e) {
450      throw e;
451    } catch (Exception ee) {
452      throw new IOException("Can not convert from directory " + dirs
453        + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
454    } finally {
455      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
456        diskBasedSortingEnabledOriginalValue);
457      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
458      conf.unset(JOB_NAME_CONF_KEY);
459    }
460  }
461
462  private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException {
463    Path bulkOutDir = getBulkOutputDirForTable(tn);
464
465    if (tgtFs.exists(bulkOutDir)) {
466      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
467      Path tgtPath = getTargetDirForTable(tn);
468      try {
469        RemoteIterator<LocatedFileStatus> locatedFiles = tgtFs.listFiles(bulkOutDir, true);
470        List<String> files = new ArrayList<>();
471        while (locatedFiles.hasNext()) {
472          LocatedFileStatus file = locatedFiles.next();
473          if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
474            files.add(file.getPath().toString());
475          }
476        }
477        incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString());
478      } finally {
479        conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
480      }
481    }
482  }
483
484  protected Path getBulkOutputDirForTable(TableName table) {
485    Path tablePath = getBulkOutputDir();
486    tablePath = new Path(tablePath, table.getNamespaceAsString());
487    tablePath = new Path(tablePath, table.getQualifierAsString());
488    return new Path(tablePath, "data");
489  }
490
491  protected Path getBulkOutputDir() {
492    String backupId = backupInfo.getBackupId();
493    Path path = new Path(backupInfo.getBackupRootDir());
494    path = new Path(path, ".tmp");
495    path = new Path(path, backupId);
496    return path;
497  }
498
499  private Path getTargetDirForTable(TableName table) {
500    Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId());
501    path = new Path(path, table.getNamespaceAsString());
502    path = new Path(path, table.getQualifierAsString());
503    return path;
504  }
505
506  private void setupRegionLocator() throws IOException {
507    Map<TableName, String> fullBackupIds = getFullBackupIds();
508    try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
509
510      for (TableName tableName : backupInfo.getTables()) {
511        String fullBackupId = fullBackupIds.get(tableName);
512        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId);
513        String snapshotName = fullBackupInfo.getSnapshotName(tableName);
514        Path root = HBackupFileSystem.getTableBackupPath(tableName,
515          new Path(fullBackupInfo.getBackupRootDir()), fullBackupId);
516        String manifestDir =
517          SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root).toString();
518        SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir, tableName);
519      }
520    }
521  }
522
523  private Map<TableName, String> getFullBackupIds() throws IOException {
524    // Ancestors are stored from newest to oldest, so we can iterate backwards
525    // in order to populate our backupId map with the most recent full backup
526    // for a given table
527    List<BackupManifest.BackupImage> images = getAncestors(backupInfo);
528    Map<TableName, String> results = new HashMap<>();
529    for (int i = images.size() - 1; i >= 0; i--) {
530      BackupManifest.BackupImage image = images.get(i);
531      if (image.getType() != BackupType.FULL) {
532        continue;
533      }
534
535      for (TableName tn : image.getTableNames()) {
536        results.put(tn, image.getBackupId());
537      }
538    }
539    return results;
540  }
541
542  /**
543   * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full
544   * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch
545   * is detected, a full table backup should be taken, rather than an incremental one
546   */
547  private void verifyCfCompatibility(Set<TableName> tables,
548    Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException {
549    ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder =
550      ColumnFamilyMismatchException.newBuilder();
551    try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
552      for (TableName tn : tables) {
553        String backupId = tablesToFullBackupId.get(tn);
554        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId);
555
556        ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies();
557        String snapshotName = fullBackupInfo.getSnapshotName(tn);
558        Path root = HBackupFileSystem.getTableBackupPath(tn,
559          new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId());
560        Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root);
561
562        FileSystem fs;
563        try {
564          fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf);
565        } catch (URISyntaxException e) {
566          throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e);
567        }
568
569        SnapshotProtos.SnapshotDescription snapshotDescription =
570          SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir);
571        SnapshotManifest manifest =
572          SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription);
573        if (
574          SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDescription.getTtl(),
575            snapshotDescription.getCreationTime(), EnvironmentEdgeManager.currentTime())
576        ) {
577          throw new SnapshotTTLExpiredException(
578            ProtobufUtil.createSnapshotDesc(snapshotDescription));
579        }
580
581        ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies();
582        if (!areCfsCompatible(currentCfs, backupCfs)) {
583          exBuilder.addMismatchedTable(tn, currentCfs, backupCfs);
584        }
585      }
586    }
587
588    ColumnFamilyMismatchException ex = exBuilder.build();
589    if (!ex.getMismatchedTables().isEmpty()) {
590      throw ex;
591    }
592  }
593
594  private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs,
595    ColumnFamilyDescriptor[] backupCfs) {
596    if (currentCfs.length != backupCfs.length) {
597      return false;
598    }
599
600    for (int i = 0; i < backupCfs.length; i++) {
601      String currentCf = currentCfs[i].getNameAsString();
602      String backupCf = backupCfs[i].getNameAsString();
603
604      if (!currentCf.equals(backupCf)) {
605        return false;
606      }
607    }
608
609    return true;
610  }
611}