001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
021import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
022import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
023import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
024import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.backup.BackupCopyJob;
032import org.apache.hadoop.hbase.backup.BackupInfo;
033import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
035import org.apache.hadoop.hbase.backup.BackupRequest;
036import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
037import org.apache.hadoop.hbase.backup.BackupType;
038import org.apache.hadoop.hbase.backup.util.BackupUtils;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Full table backup implementation
048 */
049@InterfaceAudience.Private
050public class FullTableBackupClient extends TableBackupClient {
051  private static final Logger LOG = LoggerFactory.getLogger(FullTableBackupClient.class);
052
053  public FullTableBackupClient() {
054  }
055
056  public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
057    throws IOException {
058    super(conn, backupId, request);
059  }
060
061  /**
062   * Do snapshot copy.
063   * @param backupInfo backup info
064   * @throws Exception exception
065   */
066  protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
067    LOG.info("Snapshot copy is starting.");
068
069    // set overall backup phase: snapshot_copy
070    backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
071
072    // call ExportSnapshot to copy files based on hbase snapshot for backup
073    // ExportSnapshot only support single snapshot export, need loop for multiple tables case
074    BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
075
076    // number of snapshots matches number of tables
077    float numOfSnapshots = backupInfo.getSnapshotNames().size();
078
079    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
080
081    for (TableName table : backupInfo.getTables()) {
082      // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
083      // calculate the real files' size for the percentage in the future.
084      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
085      int res;
086      ArrayList<String> argsList = new ArrayList<>();
087      argsList.add("-snapshot");
088      argsList.add(backupInfo.getSnapshotName(table));
089      argsList.add("-copy-to");
090      argsList.add(backupInfo.getTableBackupDir(table));
091      if (backupInfo.getBandwidth() > -1) {
092        argsList.add("-bandwidth");
093        argsList.add(String.valueOf(backupInfo.getBandwidth()));
094      }
095      if (backupInfo.getWorkers() > -1) {
096        argsList.add("-mappers");
097        argsList.add(String.valueOf(backupInfo.getWorkers()));
098      }
099      if (backupInfo.getNoChecksumVerify()) {
100        argsList.add("-no-checksum-verify");
101      }
102
103      String[] args = argsList.toArray(new String[0]);
104
105      String jobname = "Full-Backup_" + backupInfo.getBackupId() + "_" + table.getNameAsString();
106      if (LOG.isDebugEnabled()) {
107        LOG.debug("Setting snapshot copy job name to : " + jobname);
108      }
109      conf.set(JOB_NAME_CONF_KEY, jobname);
110
111      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
112      res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args);
113
114      // if one snapshot export failed, do not continue for remained snapshots
115      if (res != 0) {
116        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
117
118        throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
119          + " with reason code " + res);
120      }
121
122      conf.unset(JOB_NAME_CONF_KEY);
123      LOG.info("Snapshot copy " + args[1] + " finished.");
124    }
125  }
126
127  /**
128   * Backup request execution.
129   * @throws IOException if the execution of the backup fails
130   */
131  @Override
132  public void execute() throws IOException {
133    try (Admin admin = conn.getAdmin()) {
134      // Begin BACKUP
135      beginBackup(backupManager, backupInfo);
136      String savedStartCode;
137      boolean firstBackup;
138      // do snapshot for full table backup
139
140      savedStartCode = backupManager.readBackupStartCode();
141      firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
142      if (firstBackup) {
143        // This is our first backup. Let's put some marker to system table so that we can hold the
144        // logs while we do the backup.
145        backupManager.writeBackupStartCode(0L);
146      }
147      // We roll log here before we do the snapshot. It is possible there is duplicate data
148      // in the log that is already in the snapshot. But if we do it after the snapshot, we
149      // could have data loss.
150      // A better approach is to do the roll log on each RS in the same global procedure as
151      // the snapshot.
152      LOG.info("Execute roll log procedure for full backup ...");
153
154      // Gather the bulk loads being tracked by the system, which can be deleted (since their data
155      // will be part of the snapshot being taken). We gather this list before taking the actual
156      // snapshots for the same reason as the log rolls.
157      List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);
158
159      BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
160
161      newTimestamps = backupManager.readRegionServerLastLogRollResult();
162
163      // SNAPSHOT_TABLES:
164      backupInfo.setPhase(BackupPhase.SNAPSHOT);
165      for (TableName tableName : tableList) {
166        String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
167          + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
168
169        snapshotTable(admin, tableName, snapshotName);
170        backupInfo.setSnapshotName(tableName, snapshotName);
171      }
172
173      // SNAPSHOT_COPY:
174      // do snapshot copy
175      LOG.debug("snapshot copy for " + backupId);
176      snapshotCopy(backupInfo);
177      // Updates incremental backup table set
178      backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
179
180      // BACKUP_COMPLETE:
181      // set overall backup status: complete. Here we make sure to complete the backup.
182      // After this checkpoint, even if entering cancel process, will let the backup finished
183      backupInfo.setState(BackupState.COMPLETE);
184      // The table list in backupInfo is good for both full backup and incremental backup.
185      // For incremental backup, it contains the incremental backup table set.
186      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
187
188      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
189        backupManager.readLogTimestampMap();
190
191      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
192      Long newStartCode =
193        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
194      backupManager.writeBackupStartCode(newStartCode);
195
196      backupManager
197        .deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());
198
199      // backup complete
200      completeBackup(conn, backupInfo, BackupType.FULL, conf);
201    } catch (Exception e) {
202      failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
203        BackupType.FULL, conf);
204      throw new IOException(e);
205    }
206  }
207
208  protected void snapshotTable(Admin admin, TableName tableName, String snapshotName)
209    throws IOException {
210    int maxAttempts = conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS);
211    int pause = conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS);
212    int attempts = 0;
213
214    while (attempts++ < maxAttempts) {
215      try {
216        admin.snapshot(snapshotName, tableName);
217        return;
218      } catch (IOException ee) {
219        LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName
220          + ", sleeping for " + pause + "ms", ee);
221        if (attempts < maxAttempts) {
222          try {
223            Thread.sleep(pause);
224          } catch (InterruptedException e) {
225            Thread.currentThread().interrupt();
226            break;
227          }
228        }
229      }
230    }
231    throw new IOException("Failed to snapshot table " + tableName);
232  }
233}