001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
021
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.backup.BackupCopyJob;
036import org.apache.hadoop.hbase.backup.BackupInfo;
037import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
038import org.apache.hadoop.hbase.backup.BackupRequest;
039import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
040import org.apache.hadoop.hbase.backup.BackupType;
041import org.apache.hadoop.hbase.backup.HBackupFileSystem;
042import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
043import org.apache.hadoop.hbase.backup.util.BackupUtils;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.mapreduce.WALPlayer;
048import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
049import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.HFileArchiveUtil;
053import org.apache.hadoop.hbase.util.Pair;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.hadoop.util.Tool;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
061
062/**
063 * Incremental backup implementation. See the {@link #execute() execute} method.
064 */
065@InterfaceAudience.Private
066public class IncrementalTableBackupClient extends TableBackupClient {
067  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class);
068
069  protected IncrementalTableBackupClient() {
070  }
071
072  public IncrementalTableBackupClient(final Connection conn, final String backupId,
073    BackupRequest request) throws IOException {
074    super(conn, backupId, request);
075  }
076
077  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
078    List<String> list = new ArrayList<>();
079    for (String file : incrBackupFileList) {
080      Path p = new Path(file);
081      if (fs.exists(p) || isActiveWalPath(p)) {
082        list.add(file);
083      } else {
084        LOG.warn("Can't find file: " + file);
085      }
086    }
087    return list;
088  }
089
090  /**
091   * Check if a given path is belongs to active WAL directory
092   * @param p path
093   * @return true, if yes
094   */
095  protected boolean isActiveWalPath(Path p) {
096    return !AbstractFSWALProvider.isArchivedLogFile(p);
097  }
098
099  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
100    if (sTableList == null) {
101      return 0;
102    }
103
104    for (int i = 0; i < sTableList.size(); i++) {
105      if (tbl.equals(sTableList.get(i))) {
106        return i;
107      }
108    }
109    return -1;
110  }
111
112  /*
113   * Reads bulk load records from backup table, iterates through the records and forms the paths for
114   * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT
115   * clean up the entries in the bulk load system table. Those entries should not be cleaned until
116   * the backup is marked as complete.
117   * @param sTableList list of tables to be backed up
118   * @return the rowkeys of bulk loaded files
119   */
120  @SuppressWarnings("unchecked")
121  protected List<byte[]> handleBulkLoad(List<TableName> sTableList) throws IOException {
122    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
123    List<String> activeFiles = new ArrayList<>();
124    List<String> archiveFiles = new ArrayList<>();
125    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
126      backupManager.readBulkloadRows(sTableList);
127    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
128    FileSystem tgtFs;
129    try {
130      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
131    } catch (URISyntaxException use) {
132      throw new IOException("Unable to get FileSystem", use);
133    }
134    Path rootdir = CommonFSUtils.getRootDir(conf);
135    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
136
137    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : map
138      .entrySet()) {
139      TableName srcTable = tblEntry.getKey();
140
141      int srcIdx = getIndex(srcTable, sTableList);
142      if (srcIdx < 0) {
143        LOG.warn("Couldn't find " + srcTable + " in source table List");
144        continue;
145      }
146      if (mapForSrc[srcIdx] == null) {
147        mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
148      }
149      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
150      Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
151        srcTable.getQualifierAsString());
152      for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> regionEntry : tblEntry
153        .getValue().entrySet()) {
154        String regionName = regionEntry.getKey();
155        Path regionDir = new Path(tblDir, regionName);
156        // map from family to List of hfiles
157        for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : regionEntry.getValue()
158          .entrySet()) {
159          String fam = famEntry.getKey();
160          Path famDir = new Path(regionDir, fam);
161          List<Path> files;
162          if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) {
163            files = new ArrayList<>();
164            mapForSrc[srcIdx].put(Bytes.toBytes(fam), files);
165          } else {
166            files = mapForSrc[srcIdx].get(Bytes.toBytes(fam));
167          }
168          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
169          String tblName = srcTable.getQualifierAsString();
170          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
171          if (!tgtFs.mkdirs(tgtFam)) {
172            throw new IOException("couldn't create " + tgtFam);
173          }
174          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
175            String file = fileWithState.getFirst();
176            int idx = file.lastIndexOf("/");
177            String filename = file;
178            if (idx > 0) {
179              filename = file.substring(idx + 1);
180            }
181            Path p = new Path(famDir, filename);
182            Path tgt = new Path(tgtFam, filename);
183            Path archive = new Path(archiveDir, filename);
184            if (fs.exists(p)) {
185              if (LOG.isTraceEnabled()) {
186                LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
187              }
188              if (LOG.isTraceEnabled()) {
189                LOG.trace("copying " + p + " to " + tgt);
190              }
191              activeFiles.add(p.toString());
192            } else if (fs.exists(archive)) {
193              LOG.debug("copying archive " + archive + " to " + tgt);
194              archiveFiles.add(archive.toString());
195            }
196            files.add(tgt);
197          }
198        }
199      }
200    }
201
202    copyBulkLoadedFiles(activeFiles, archiveFiles);
203
204    return pair.getSecond();
205  }
206
207  private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
208    throws IOException {
209    try {
210      // Enable special mode of BackupDistCp
211      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
212      // Copy active files
213      String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
214      int attempt = 1;
215      while (activeFiles.size() > 0) {
216        LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++);
217        String[] toCopy = new String[activeFiles.size()];
218        activeFiles.toArray(toCopy);
219        // Active file can be archived during copy operation,
220        // we need to handle this properly
221        try {
222          incrementalCopyHFiles(toCopy, tgtDest);
223          break;
224        } catch (IOException e) {
225          // Check if some files got archived
226          // Update active and archived lists
227          // When file is being moved from active to archive
228          // directory, the number of active files decreases
229          int numOfActive = activeFiles.size();
230          updateFileLists(activeFiles, archiveFiles);
231          if (activeFiles.size() < numOfActive) {
232            continue;
233          }
234          // if not - throw exception
235          throw e;
236        }
237      }
238      // If incremental copy will fail for archived files
239      // we will have partially loaded files in backup destination (only files from active data
240      // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
241      if (archiveFiles.size() > 0) {
242        String[] toCopy = new String[archiveFiles.size()];
243        archiveFiles.toArray(toCopy);
244        incrementalCopyHFiles(toCopy, tgtDest);
245      }
246    } finally {
247      // Disable special mode of BackupDistCp
248      conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
249    }
250  }
251
252  private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
253    throws IOException {
254    List<String> newlyArchived = new ArrayList<>();
255
256    for (String spath : activeFiles) {
257      if (!fs.exists(new Path(spath))) {
258        newlyArchived.add(spath);
259      }
260    }
261
262    if (newlyArchived.size() > 0) {
263      activeFiles.removeAll(newlyArchived);
264      archiveFiles.addAll(newlyArchived);
265    }
266
267    LOG.debug(newlyArchived.size() + " files have been archived.");
268  }
269
270  /**
271   * @throws IOException                   If the execution of the backup fails
272   * @throws ColumnFamilyMismatchException If the column families of the current table do not match
273   *                                       the column families for the last full backup. In which
274   *                                       case, a full backup should be taken
275   */
276  @Override
277  public void execute() throws IOException, ColumnFamilyMismatchException {
278    try {
279      Map<TableName, String> tablesToFullBackupIds = getFullBackupIds();
280      verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds);
281
282      // case PREPARE_INCREMENTAL:
283      beginBackup(backupManager, backupInfo);
284      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
285      LOG.debug("For incremental backup, current table set is "
286        + backupManager.getIncrementalBackupTableSet());
287      newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
288    } catch (Exception e) {
289      // fail the overall backup and return
290      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
291        BackupType.INCREMENTAL, conf);
292      throw new IOException(e);
293    }
294
295    // case INCREMENTAL_COPY:
296    try {
297      // copy out the table and region info files for each table
298      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
299      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
300      convertWALsToHFiles();
301      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
302        backupInfo.getBackupRootDir());
303    } catch (Exception e) {
304      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
305      // fail the overall backup and return
306      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
307      throw new IOException(e);
308    }
309    // case INCR_BACKUP_COMPLETE:
310    // set overall backup status: complete. Here we make sure to complete the backup.
311    // After this checkpoint, even if entering cancel process, will let the backup finished
312    try {
313      // Set the previousTimestampMap which is before this current log roll to the manifest.
314      Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
315      backupInfo.setIncrTimestampMap(previousTimestampMap);
316
317      // The table list in backupInfo is good for both full backup and incremental backup.
318      // For incremental backup, it contains the incremental backup table set.
319      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
320
321      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
322        backupManager.readLogTimestampMap();
323
324      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
325      Long newStartCode =
326        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
327      backupManager.writeBackupStartCode(newStartCode);
328
329      List<byte[]> bulkLoadedRows = handleBulkLoad(backupInfo.getTableNames());
330
331      // backup complete
332      completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
333
334      backupManager.deleteBulkLoadedRows(bulkLoadedRows);
335    } catch (IOException e) {
336      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
337        BackupType.INCREMENTAL, conf);
338      throw new IOException(e);
339    }
340  }
341
342  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
343    try {
344      LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
345      // set overall backup phase: incremental_copy
346      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
347      // get incremental backup file list and prepare parms for DistCp
348      String[] strArr = new String[files.length + 1];
349      System.arraycopy(files, 0, strArr, 0, files.length);
350      strArr[strArr.length - 1] = backupDest;
351
352      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
353      if (LOG.isDebugEnabled()) {
354        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
355      }
356      conf.set(JOB_NAME_CONF_KEY, jobname);
357
358      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
359      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
360      if (res != 0) {
361        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
362        throw new IOException(
363          "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest);
364      }
365      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest
366        + " finished.");
367    } finally {
368      deleteBulkLoadDirectory();
369    }
370  }
371
372  protected void deleteBulkLoadDirectory() throws IOException {
373    // delete original bulk load directory on method exit
374    Path path = getBulkOutputDir();
375    FileSystem fs = FileSystem.get(path.toUri(), conf);
376    boolean result = fs.delete(path, true);
377    if (!result) {
378      LOG.warn("Could not delete " + path);
379    }
380  }
381
382  protected void convertWALsToHFiles() throws IOException {
383    // get incremental backup file list and prepare parameters for DistCp
384    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
385    // Get list of tables in incremental backup set
386    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
387    // filter missing files out (they have been copied by previous backups)
388    incrBackupFileList = filterMissingFiles(incrBackupFileList);
389    List<String> tableList = new ArrayList<String>();
390    for (TableName table : tableSet) {
391      // Check if table exists
392      if (tableExists(table, conn)) {
393        tableList.add(table.getNameAsString());
394      } else {
395        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
396      }
397    }
398    walToHFiles(incrBackupFileList, tableList);
399
400  }
401
402  protected boolean tableExists(TableName table, Connection conn) throws IOException {
403    try (Admin admin = conn.getAdmin()) {
404      return admin.tableExists(table);
405    }
406  }
407
408  protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
409    Tool player = new WALPlayer();
410
411    // Player reads all files in arbitrary directory structure and creates
412    // a Map task for each file. We use ';' as separator
413    // because WAL file names contains ','
414    String dirs = StringUtils.join(dirPaths, ';');
415    String jobname = "Incremental_Backup-" + backupId;
416
417    Path bulkOutputPath = getBulkOutputDir();
418    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
419    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
420    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
421    conf.set(JOB_NAME_CONF_KEY, jobname);
422    String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
423
424    try {
425      player.setConf(conf);
426      int result = player.run(playerArgs);
427      if (result != 0) {
428        throw new IOException("WAL Player failed");
429      }
430      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
431      conf.unset(JOB_NAME_CONF_KEY);
432    } catch (IOException e) {
433      throw e;
434    } catch (Exception ee) {
435      throw new IOException("Can not convert from directory " + dirs
436        + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
437    }
438  }
439
440  protected Path getBulkOutputDirForTable(TableName table) {
441    Path tablePath = getBulkOutputDir();
442    tablePath = new Path(tablePath, table.getNamespaceAsString());
443    tablePath = new Path(tablePath, table.getQualifierAsString());
444    return new Path(tablePath, "data");
445  }
446
447  protected Path getBulkOutputDir() {
448    String backupId = backupInfo.getBackupId();
449    Path path = new Path(backupInfo.getBackupRootDir());
450    path = new Path(path, ".tmp");
451    path = new Path(path, backupId);
452    return path;
453  }
454
455  private Map<TableName, String> getFullBackupIds() throws IOException {
456    // Ancestors are stored from newest to oldest, so we can iterate backwards
457    // in order to populate our backupId map with the most recent full backup
458    // for a given table
459    List<BackupManifest.BackupImage> images = getAncestors(backupInfo);
460    Map<TableName, String> results = new HashMap<>();
461    for (int i = images.size() - 1; i >= 0; i--) {
462      BackupManifest.BackupImage image = images.get(i);
463      if (image.getType() != BackupType.FULL) {
464        continue;
465      }
466
467      for (TableName tn : image.getTableNames()) {
468        results.put(tn, image.getBackupId());
469      }
470    }
471    return results;
472  }
473
474  /**
475   * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full
476   * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch
477   * is detected, a full table backup should be taken, rather than an incremental one
478   */
479  private void verifyCfCompatibility(Set<TableName> tables,
480    Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException {
481    ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder =
482      ColumnFamilyMismatchException.newBuilder();
483    try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
484      for (TableName tn : tables) {
485        String backupId = tablesToFullBackupId.get(tn);
486        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId);
487
488        ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies();
489        String snapshotName = fullBackupInfo.getSnapshotName(tn);
490        Path root = HBackupFileSystem.getTableBackupPath(tn,
491          new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId());
492        Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root);
493
494        FileSystem fs;
495        try {
496          fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf);
497        } catch (URISyntaxException e) {
498          throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e);
499        }
500
501        SnapshotProtos.SnapshotDescription snapshotDescription =
502          SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir);
503        SnapshotManifest manifest =
504          SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription);
505
506        ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies();
507        if (!areCfsCompatible(currentCfs, backupCfs)) {
508          exBuilder.addMismatchedTable(tn, currentCfs, backupCfs);
509        }
510      }
511    }
512
513    ColumnFamilyMismatchException ex = exBuilder.build();
514    if (!ex.getMismatchedTables().isEmpty()) {
515      throw ex;
516    }
517  }
518
519  private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs,
520    ColumnFamilyDescriptor[] backupCfs) {
521    if (currentCfs.length != backupCfs.length) {
522      return false;
523    }
524
525    for (int i = 0; i < backupCfs.length; i++) {
526      String currentCf = currentCfs[i].getNameAsString();
527      String backupCf = backupCfs[i].getNameAsString();
528
529      if (!currentCf.equals(backupCf)) {
530        return false;
531      }
532    }
533
534    return true;
535  }
536}