001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Objects;
027import java.util.Set;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.BackupInfo;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
035import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
036import org.apache.hadoop.hbase.backup.BackupRequest;
037import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
038import org.apache.hadoop.hbase.backup.BackupType;
039import org.apache.hadoop.hbase.backup.HBackupFileSystem;
040import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Base class for backup operation. Concrete implementation for full and incremental backup are
051 * delegated to corresponding sub-classes: {@link FullTableBackupClient} and
052 * {@link IncrementalTableBackupClient}
053 */
054@InterfaceAudience.Private
055public abstract class TableBackupClient {
056
057  public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class";
058
059  public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage";
060
061  private static final Logger LOG = LoggerFactory.getLogger(TableBackupClient.class);
062
063  protected Configuration conf;
064  protected Connection conn;
065  protected String backupId;
066  protected List<TableName> tableList;
067  protected Map<String, Long> newTimestamps = null;
068
069  protected BackupManager backupManager;
070  protected BackupInfo backupInfo;
071  protected FileSystem fs;
072
073  public TableBackupClient() {
074  }
075
076  public TableBackupClient(final Connection conn, final String backupId, BackupRequest request)
077    throws IOException {
078    init(conn, backupId, request);
079  }
080
081  public void init(final Connection conn, final String backupId, BackupRequest request)
082    throws IOException {
083    if (request.getBackupType() == BackupType.FULL) {
084      backupManager = new BackupManager(conn, conn.getConfiguration());
085    } else {
086      backupManager = new IncrementalBackupManager(conn, conn.getConfiguration());
087    }
088    this.backupId = backupId;
089    this.tableList = request.getTableList();
090    this.conn = conn;
091    this.conf = conn.getConfiguration();
092    this.fs = CommonFSUtils.getCurrentFileSystem(conf);
093    backupInfo = backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
094      request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth(),
095      request.getNoChecksumVerify());
096    if (tableList == null || tableList.isEmpty()) {
097      this.tableList = new ArrayList<>(backupInfo.getTables());
098    }
099    // Start new session
100    backupManager.startBackupSession();
101  }
102
103  /**
104   * Begin the overall backup.
105   * @param backupInfo backup info
106   * @throws IOException exception
107   */
108  protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo)
109    throws IOException {
110
111    BackupSystemTable.snapshot(conn);
112    backupManager.setBackupInfo(backupInfo);
113    // set the start timestamp of the overall backup
114    long startTs = EnvironmentEdgeManager.currentTime();
115    backupInfo.setStartTs(startTs);
116    // set overall backup status: ongoing
117    backupInfo.setState(BackupState.RUNNING);
118    backupInfo.setPhase(BackupPhase.REQUEST);
119    LOG.info("Backup " + backupInfo.getBackupId() + " started at " + startTs + ".");
120
121    backupManager.updateBackupInfo(backupInfo);
122    if (LOG.isDebugEnabled()) {
123      LOG.debug("Backup session " + backupInfo.getBackupId() + " has been started.");
124    }
125  }
126
127  protected String getMessage(Exception e) {
128    String msg = e.getMessage();
129    if (msg == null || msg.equals("")) {
130      msg = e.getClass().getName();
131    }
132    return msg;
133  }
134
135  /**
136   * Delete HBase snapshot for backup.
137   * @param backupInfo backup info
138   * @throws IOException exception
139   */
140  protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo,
141    Configuration conf) throws IOException {
142    LOG.debug("Trying to delete snapshot for full backup.");
143    for (String snapshotName : backupInfo.getSnapshotNames()) {
144      if (snapshotName == null) {
145        continue;
146      }
147      LOG.debug("Trying to delete snapshot: " + snapshotName);
148
149      try (Admin admin = conn.getAdmin()) {
150        admin.deleteSnapshot(snapshotName);
151      }
152      LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId()
153        + " succeeded.");
154    }
155  }
156
157  /**
158   * Clean up directories with prefix "exportSnapshot-", which are generated when exporting
159   * snapshots.
160   * @throws IOException exception
161   */
162  protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException {
163    FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
164    Path stagingDir = new Path(
165      conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory().toString()));
166    FileStatus[] files = CommonFSUtils.listStatus(fs, stagingDir);
167    if (files == null) {
168      return;
169    }
170    for (FileStatus file : files) {
171      if (file.getPath().getName().startsWith("exportSnapshot-")) {
172        LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName());
173        if (CommonFSUtils.delete(fs, file.getPath(), true) == false) {
174          LOG.warn("Can not delete " + file.getPath());
175        }
176      }
177    }
178  }
179
180  /**
181   * Clean up the uncompleted data at target directory if the ongoing backup has already entered the
182   * copy phase.
183   */
184  protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
185    try {
186      // clean up the uncompleted data at target directory if the ongoing backup has already entered
187      // the copy phase
188      LOG.debug("Trying to cleanup up target dir. Current backup phase: " + backupInfo.getPhase());
189      if (
190        backupInfo.getPhase().equals(BackupPhase.SNAPSHOTCOPY)
191          || backupInfo.getPhase().equals(BackupPhase.INCREMENTAL_COPY)
192          || backupInfo.getPhase().equals(BackupPhase.STORE_MANIFEST)
193      ) {
194        FileSystem outputFs = FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf);
195
196        // now treat one backup as a transaction, clean up data that has been partially copied at
197        // table level
198        for (TableName table : backupInfo.getTables()) {
199          Path targetDirPath = new Path(HBackupFileSystem
200            .getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(), table));
201          if (outputFs.delete(targetDirPath, true)) {
202            LOG.debug(
203              "Cleaning up uncompleted backup data at " + targetDirPath.toString() + " done.");
204          } else {
205            LOG.debug("No data has been copied to " + targetDirPath.toString() + ".");
206          }
207
208          Path tableDir = targetDirPath.getParent();
209          FileStatus[] backups = CommonFSUtils.listStatus(outputFs, tableDir);
210          if (backups == null || backups.length == 0) {
211            outputFs.delete(tableDir, true);
212            LOG.debug(tableDir.toString() + " is empty, remove it.");
213          }
214        }
215      }
216
217    } catch (IOException e1) {
218      LOG.error("Cleaning up uncompleted backup data of " + backupInfo.getBackupId() + " at "
219        + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + ".");
220    }
221  }
222
223  /**
224   * Fail the overall backup.
225   * @param backupInfo backup info
226   * @param e          exception
227   * @throws IOException exception
228   */
229  protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager,
230    Exception e, String msg, BackupType type, Configuration conf) throws IOException {
231    try {
232      LOG.error(msg + getMessage(e), e);
233      // If this is a cancel exception, then we've already cleaned.
234      // set the failure timestamp of the overall backup
235      backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
236      // set failure message
237      backupInfo.setFailedMsg(e.getMessage());
238      // set overall backup status: failed
239      backupInfo.setState(BackupState.FAILED);
240      // compose the backup failed data
241      String backupFailedData = "BackupId=" + backupInfo.getBackupId() + ",startts="
242        + backupInfo.getStartTs() + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase="
243        + backupInfo.getPhase() + ",failedmessage=" + backupInfo.getFailedMsg();
244      LOG.error(backupFailedData);
245      cleanupAndRestoreBackupSystem(conn, backupInfo, conf);
246      // If backup session is updated to FAILED state - means we
247      // processed recovery already.
248      backupManager.updateBackupInfo(backupInfo);
249      backupManager.finishBackupSession();
250      LOG.error("Backup " + backupInfo.getBackupId() + " failed.");
251    } catch (IOException ee) {
252      LOG.error("Please run backup repair tool manually to restore backup system integrity");
253      throw ee;
254    }
255  }
256
257  public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo backupInfo,
258    Configuration conf) throws IOException {
259    BackupType type = backupInfo.getType();
260    // if full backup, then delete HBase snapshots if there already are snapshots taken
261    // and also clean up export snapshot log files if exist
262    if (type == BackupType.FULL) {
263      deleteSnapshots(conn, backupInfo, conf);
264      cleanupExportSnapshotLog(conf);
265    }
266    BackupSystemTable.restoreFromSnapshot(conn);
267    BackupSystemTable.deleteSnapshot(conn);
268    // clean up the uncompleted data at target directory if the ongoing backup has already entered
269    // the copy phase
270    // For incremental backup, DistCp logs will be cleaned with the targetDir.
271    cleanupTargetDir(backupInfo, conf);
272  }
273
274  /**
275   * Creates a manifest based on the provided info, and store it in the backup-specific directory.
276   * @param backupInfo The current backup info
277   * @throws IOException exception
278   */
279  protected void addManifest(BackupInfo backupInfo, BackupType type, Configuration conf)
280    throws IOException {
281    // set the overall backup phase : store manifest
282    backupInfo.setPhase(BackupPhase.STORE_MANIFEST);
283
284    BackupManifest manifest = new BackupManifest(backupInfo);
285    if (type == BackupType.INCREMENTAL) {
286      // set the table region server start and end timestamps for incremental backup
287      manifest.setIncrTimestampMap(backupInfo.getIncrTimestampMap());
288    }
289    List<BackupImage> ancestors = getAncestors(backupInfo);
290    for (BackupImage image : ancestors) {
291      manifest.addDependentImage(image);
292    }
293    manifest.store(conf);
294  }
295
296  /**
297   * Gets the direct ancestors of the currently being created backup.
298   * @param backupInfo The backup info for the backup being created
299   */
300  protected List<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException {
301    LOG.debug("Getting the direct ancestors of the current backup {}", backupInfo.getBackupId());
302
303    // Full backups do not have ancestors
304    if (backupInfo.getType() == BackupType.FULL) {
305      LOG.debug("Current backup is a full backup, no direct ancestor for it.");
306      return Collections.emptyList();
307    }
308
309    List<BackupImage> ancestors = new ArrayList<>();
310    Set<TableName> tablesToCover = new HashSet<>(backupInfo.getTables());
311
312    // Go over the backup history list from newest to oldest
313    List<BackupInfo> allHistoryList = backupManager.getBackupHistory(true);
314    for (BackupInfo backup : allHistoryList) {
315      // If the image has a different rootDir, it cannot be an ancestor.
316      if (!Objects.equals(backup.getBackupRootDir(), backupInfo.getBackupRootDir())) {
317        continue;
318      }
319
320      BackupImage.Builder builder = BackupImage.newBuilder();
321      BackupImage image = builder.withBackupId(backup.getBackupId()).withType(backup.getType())
322        .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
323        .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
324
325      // The ancestors consist of the most recent FULL backups that cover the list of tables
326      // required in the new backup and all INCREMENTAL backups that came after one of those FULL
327      // backups.
328      if (backup.getType().equals(BackupType.INCREMENTAL)) {
329        ancestors.add(image);
330        LOG.debug("Dependent incremental backup image: {BackupID={}}", image.getBackupId());
331      } else {
332        if (tablesToCover.removeAll(new HashSet<>(image.getTableNames()))) {
333          ancestors.add(image);
334          LOG.debug("Dependent full backup image: {BackupID={}}", image.getBackupId());
335
336          if (tablesToCover.isEmpty()) {
337            LOG.debug("Got {} ancestors for the current backup.", ancestors.size());
338            return Collections.unmodifiableList(ancestors);
339          }
340        }
341      }
342    }
343
344    throw new IllegalStateException(
345      "Unable to find full backup that contains tables: " + tablesToCover);
346  }
347
348  /**
349   * Get backup request meta data dir as string.
350   * @param backupInfo backup info
351   * @return meta data dir
352   */
353  protected String obtainBackupMetaDataStr(BackupInfo backupInfo) {
354    StringBuilder sb = new StringBuilder();
355    sb.append("type=" + backupInfo.getType() + ",tablelist=");
356    for (TableName table : backupInfo.getTables()) {
357      sb.append(table + ";");
358    }
359    if (sb.lastIndexOf(";") > 0) {
360      sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
361    }
362    sb.append(",targetRootDir=" + backupInfo.getBackupRootDir());
363
364    return sb.toString();
365  }
366
367  /**
368   * Complete the overall backup.
369   * @param backupInfo backup info
370   * @throws IOException exception
371   */
372  protected void completeBackup(final Connection conn, BackupInfo backupInfo, BackupType type,
373    Configuration conf) throws IOException {
374    // set the complete timestamp of the overall backup
375    backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
376    // set overall backup status: complete
377    backupInfo.setState(BackupState.COMPLETE);
378    backupInfo.setProgress(100);
379    // add and store the manifest for the backup
380    addManifest(backupInfo, type, conf);
381
382    // compose the backup complete data
383    String backupCompleteData =
384      obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs() + ",completets="
385        + backupInfo.getCompleteTs() + ",bytescopied=" + backupInfo.getTotalBytesCopied();
386    if (LOG.isDebugEnabled()) {
387      LOG.debug("Backup " + backupInfo.getBackupId() + " finished: " + backupCompleteData);
388    }
389
390    // when full backup is done:
391    // - delete HBase snapshot
392    // - clean up directories with prefix "exportSnapshot-", which are generated when exporting
393    // snapshots
394    // incremental backups use distcp, which handles cleaning up its own directories
395    if (type == BackupType.FULL) {
396      deleteSnapshots(conn, backupInfo, conf);
397      cleanupExportSnapshotLog(conf);
398    }
399    BackupSystemTable.deleteSnapshot(conn);
400    backupManager.updateBackupInfo(backupInfo);
401
402    // Finish active session
403    backupManager.finishBackupSession();
404
405    LOG.info("Backup " + backupInfo.getBackupId() + " completed.");
406  }
407
408  /**
409   * Backup request execution.
410   * @throws IOException if the execution of the backup fails
411   */
412  public abstract void execute() throws IOException;
413
414  protected Stage getTestStage() {
415    return Stage.valueOf("stage_" + conf.getInt(BACKUP_TEST_MODE_STAGE, 0));
416  }
417
418  protected void failStageIf(Stage stage) throws IOException {
419    Stage current = getTestStage();
420    if (current == stage) {
421      throw new IOException("Failed stage " + stage + " in testing");
422    }
423  }
424
425  public enum Stage {
426    stage_0,
427    stage_1,
428    stage_2,
429    stage_3,
430    stage_4
431  }
432}