001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
021import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
022import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
023import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
024import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.Map;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.backup.BackupCopyJob;
032import org.apache.hadoop.hbase.backup.BackupInfo;
033import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
035import org.apache.hadoop.hbase.backup.BackupRequest;
036import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
037import org.apache.hadoop.hbase.backup.BackupType;
038import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
039import org.apache.hadoop.hbase.backup.util.BackupUtils;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Full table backup implementation
049 */
050@InterfaceAudience.Private
051public class FullTableBackupClient extends TableBackupClient {
052  private static final Logger LOG = LoggerFactory.getLogger(FullTableBackupClient.class);
053
054  public FullTableBackupClient() {
055  }
056
057  public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
058    throws IOException {
059    super(conn, backupId, request);
060  }
061
062  /**
063   * Do snapshot copy.
064   * @param backupInfo backup info
065   * @throws Exception exception
066   */
067  protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
068    LOG.info("Snapshot copy is starting.");
069
070    // set overall backup phase: snapshot_copy
071    backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
072
073    // call ExportSnapshot to copy files based on hbase snapshot for backup
074    // ExportSnapshot only support single snapshot export, need loop for multiple tables case
075    BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
076
077    // number of snapshots matches number of tables
078    float numOfSnapshots = backupInfo.getSnapshotNames().size();
079
080    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
081
082    for (TableName table : backupInfo.getTables()) {
083      // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
084      // calculate the real files' size for the percentage in the future.
085      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
086      int res;
087      ArrayList<String> argsList = new ArrayList<>();
088      argsList.add("-snapshot");
089      argsList.add(backupInfo.getSnapshotName(table));
090      argsList.add("-copy-to");
091      argsList.add(backupInfo.getTableBackupDir(table));
092      if (backupInfo.getBandwidth() > -1) {
093        argsList.add("-bandwidth");
094        argsList.add(String.valueOf(backupInfo.getBandwidth()));
095      }
096      if (backupInfo.getWorkers() > -1) {
097        argsList.add("-mappers");
098        argsList.add(String.valueOf(backupInfo.getWorkers()));
099      }
100
101      String[] args = argsList.toArray(new String[0]);
102
103      String jobname = "Full-Backup_" + backupInfo.getBackupId() + "_" + table.getNameAsString();
104      if (LOG.isDebugEnabled()) {
105        LOG.debug("Setting snapshot copy job name to : " + jobname);
106      }
107      conf.set(JOB_NAME_CONF_KEY, jobname);
108
109      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
110      res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args);
111
112      // if one snapshot export failed, do not continue for remained snapshots
113      if (res != 0) {
114        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
115
116        throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
117          + " with reason code " + res);
118      }
119
120      conf.unset(JOB_NAME_CONF_KEY);
121      LOG.info("Snapshot copy " + args[1] + " finished.");
122    }
123  }
124
125  /**
126   * Backup request execution.
127   * @throws IOException if the execution of the backup fails
128   */
129  @Override
130  public void execute() throws IOException {
131    try (Admin admin = conn.getAdmin()) {
132      // Begin BACKUP
133      beginBackup(backupManager, backupInfo);
134      String savedStartCode;
135      boolean firstBackup;
136      // do snapshot for full table backup
137
138      savedStartCode = backupManager.readBackupStartCode();
139      firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
140      if (firstBackup) {
141        // This is our first backup. Let's put some marker to system table so that we can hold the
142        // logs while we do the backup.
143        backupManager.writeBackupStartCode(0L);
144      }
145      // We roll log here before we do the snapshot. It is possible there is duplicate data
146      // in the log that is already in the snapshot. But if we do it after the snapshot, we
147      // could have data loss.
148      // A better approach is to do the roll log on each RS in the same global procedure as
149      // the snapshot.
150      LOG.info("Execute roll log procedure for full backup ...");
151
152      Map<String, String> props = new HashMap<>();
153      props.put("backupRoot", backupInfo.getBackupRootDir());
154      admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
155        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
156
157      newTimestamps = backupManager.readRegionServerLastLogRollResult();
158
159      // SNAPSHOT_TABLES:
160      backupInfo.setPhase(BackupPhase.SNAPSHOT);
161      for (TableName tableName : tableList) {
162        String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
163          + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
164
165        snapshotTable(admin, tableName, snapshotName);
166        backupInfo.setSnapshotName(tableName, snapshotName);
167      }
168
169      // SNAPSHOT_COPY:
170      // do snapshot copy
171      LOG.debug("snapshot copy for " + backupId);
172      snapshotCopy(backupInfo);
173      // Updates incremental backup table set
174      backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
175
176      // BACKUP_COMPLETE:
177      // set overall backup status: complete. Here we make sure to complete the backup.
178      // After this checkpoint, even if entering cancel process, will let the backup finished
179      backupInfo.setState(BackupState.COMPLETE);
180      // The table list in backupInfo is good for both full backup and incremental backup.
181      // For incremental backup, it contains the incremental backup table set.
182      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
183
184      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
185        backupManager.readLogTimestampMap();
186
187      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
188      Long newStartCode =
189        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
190      backupManager.writeBackupStartCode(newStartCode);
191
192      // backup complete
193      completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf);
194    } catch (Exception e) {
195      failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
196        BackupType.FULL, conf);
197      throw new IOException(e);
198    }
199  }
200
201  protected void snapshotTable(Admin admin, TableName tableName, String snapshotName)
202    throws IOException {
203    int maxAttempts = conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS);
204    int pause = conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS);
205    int attempts = 0;
206
207    while (attempts++ < maxAttempts) {
208      try {
209        admin.snapshot(snapshotName, tableName);
210        return;
211      } catch (IOException ee) {
212        LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName
213          + ", sleeping for " + pause + "ms", ee);
214        if (attempts < maxAttempts) {
215          try {
216            Thread.sleep(pause);
217          } catch (InterruptedException e) {
218            Thread.currentThread().interrupt();
219            break;
220          }
221        }
222      }
223    }
224    throw new IOException("Failed to snapshot table " + tableName);
225  }
226}