001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.master;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.EnumSet;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.backup.BackupInfo;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
035import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
036import org.apache.hadoop.hbase.backup.impl.BackupManager;
037import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
038import org.apache.hadoop.hbase.backup.util.BackupBoundaries;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.master.HMaster;
042import org.apache.hadoop.hbase.master.MasterServices;
043import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
044import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
045import org.apache.hadoop.hbase.net.Address;
046import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
052import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
053
054/**
055 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
056 * before deleting it when its TTL is over.
057 */
058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
059public class BackupLogCleaner extends BaseLogCleanerDelegate {
060  private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class);
061  private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis();
062  static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms";
063
064  private boolean stopped = false;
065  private Connection conn;
066
067  public BackupLogCleaner() {
068  }
069
070  @Override
071  public void init(Map<String, Object> params) {
072    MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER);
073    if (master != null) {
074      conn = master.getConnection();
075      if (getConf() == null) {
076        super.setConf(conn.getConfiguration());
077      }
078    }
079    if (conn == null) {
080      try {
081        conn = ConnectionFactory.createConnection(getConf());
082      } catch (IOException ioe) {
083        throw new RuntimeException("Failed to create connection", ioe);
084      }
085    }
086  }
087
088  /**
089   * Calculates the timestamp boundary up to which all backup roots have already included the WAL.
090   * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
091   * backups.
092   * @param backups  all completed or running backups to use for the calculation of the boundary
093   * @param tsBuffer a buffer (in ms) to lower the boundary for the default bound
094   */
095  protected static BackupBoundaries calculatePreservationBoundary(List<BackupInfo> backups,
096    long tsBuffer) {
097    if (LOG.isDebugEnabled()) {
098      LOG.debug(
099        "Cleaning WALs if they are older than the WAL cleanup time-boundary. "
100          + "Checking WALs against {} backups: {}",
101        backups.size(),
102        backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
103    }
104
105    // This map tracks, for every backup root, the most recent (= highest timestamp) completed
106    // backup, or if there is no such one, the currently running backup (if any)
107    Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
108    for (BackupInfo backup : backups) {
109      BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
110      if (existingEntry == null || existingEntry.getState() == BackupState.RUNNING) {
111        newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
112      }
113    }
114
115    if (LOG.isDebugEnabled()) {
116      LOG.debug("WAL cleanup time-boundary using info from: {}. ",
117        newestBackupPerRootDir.entrySet().stream()
118          .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted()
119          .collect(Collectors.joining(", ")));
120    }
121
122    BackupBoundaries.BackupBoundariesBuilder builder = BackupBoundaries.builder(tsBuffer);
123    newestBackupPerRootDir.values().forEach(builder::update);
124    BackupBoundaries boundaries = builder.build();
125
126    if (LOG.isDebugEnabled()) {
127      LOG.debug("Boundaries defaultBoundary: {}", boundaries.getDefaultBoundary());
128      for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) {
129        LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
130          entry.getValue());
131      }
132    }
133
134    return boundaries;
135  }
136
137  @Override
138  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
139    List<FileStatus> filteredFiles = new ArrayList<>();
140
141    // all members of this class are null if backup is disabled,
142    // so we cannot filter the files
143    if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
144      LOG.debug("Backup is not enabled. Check your {} setting",
145        BackupRestoreConstants.BACKUP_ENABLE_KEY);
146      return files;
147    }
148
149    BackupBoundaries boundaries;
150    try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
151      long tsBuffer = getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT);
152      List<BackupInfo> backupHistory = sysTable.getBackupHistory(
153        i -> EnumSet.of(BackupState.COMPLETE, BackupState.RUNNING).contains(i.getState()));
154      boundaries = calculatePreservationBoundary(backupHistory, tsBuffer);
155    } catch (IOException ex) {
156      LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
157        ex.getMessage(), ex);
158      return Collections.emptyList();
159    }
160    for (FileStatus file : files) {
161      if (canDeleteFile(boundaries, file.getPath())) {
162        filteredFiles.add(file);
163      }
164    }
165
166    LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files),
167      filteredFiles.size());
168    return filteredFiles;
169  }
170
171  @Override
172  public void setConf(Configuration config) {
173    // If backup is disabled, keep all members null
174    super.setConf(config);
175    if (
176      !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
177        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
178    ) {
179      LOG.warn("Backup is disabled - allowing all wals to be deleted");
180    }
181  }
182
183  @Override
184  public void stop(String why) {
185    if (!this.stopped) {
186      this.stopped = true;
187      LOG.info("Stopping BackupLogCleaner");
188    }
189  }
190
191  @Override
192  public boolean isStopped() {
193    return this.stopped;
194  }
195
196  protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) {
197    if (isHMasterWAL(path)) {
198      return true;
199    }
200    return boundaries.isDeletable(path);
201  }
202
203  private static boolean isHMasterWAL(Path path) {
204    String fn = path.getName();
205    return fn.startsWith(WALProcedureStore.LOG_PREFIX)
206      || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
207      || path.toString().contains("/%s/".formatted(MasterRegionFactory.MASTER_STORE_DIR));
208  }
209}