001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.master;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
021
022import java.io.IOException;
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.backup.BackupInfo;
036import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
037import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
038import org.apache.hadoop.hbase.backup.impl.BackupManager;
039import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
040import org.apache.hadoop.hbase.backup.util.BackupBoundaries;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.master.MasterServices;
045import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
046import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
047import org.apache.hadoop.hbase.net.Address;
048import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
054import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
055
056/**
057 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
058 * before deleting it when its TTL is over.
059 */
060@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
061public class BackupLogCleaner extends BaseLogCleanerDelegate {
062  private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class);
063  private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis();
064  static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms";
065
066  private boolean stopped = false;
067  private Connection conn;
068
069  public BackupLogCleaner() {
070  }
071
072  @Override
073  public void init(Map<String, Object> params) {
074    MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER);
075    if (master != null) {
076      conn = master.getConnection();
077      if (getConf() == null) {
078        super.setConf(conn.getConfiguration());
079      }
080    }
081    if (conn == null) {
082      try {
083        conn = ConnectionFactory.createConnection(getConf());
084      } catch (IOException ioe) {
085        throw new RuntimeException("Failed to create connection", ioe);
086      }
087    }
088  }
089
090  /**
091   * Calculates the timestamp boundary up to which all backup roots have already included the WAL.
092   * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
093   * backups.
094   */
095  private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable)
096    throws IOException {
097    List<BackupInfo> backups = sysTable.getBackupHistory(withState(BackupState.COMPLETE));
098    if (LOG.isDebugEnabled()) {
099      LOG.debug(
100        "Cleaning WALs if they are older than the WAL cleanup time-boundary. "
101          + "Checking WALs against {} backups: {}",
102        backups.size(),
103        backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
104    }
105
106    // This map tracks, for every backup root, the most recent created backup (= highest timestamp)
107    Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
108    for (BackupInfo backup : backups) {
109      BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
110      if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
111        newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
112      }
113    }
114
115    if (LOG.isDebugEnabled()) {
116      LOG.debug("WAL cleanup time-boundary using info from: {}. ",
117        newestBackupPerRootDir.entrySet().stream()
118          .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted()
119          .collect(Collectors.joining(", ")));
120    }
121
122    BackupBoundaries.BackupBoundariesBuilder builder =
123      BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT));
124    for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
125      long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir()));
126      // Iterate over all tables in the timestamp map, which contains all tables covered in the
127      // backup root, not just the tables included in that specific backup (which could be a subset)
128      for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
129        for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
130          .entrySet()) {
131          builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode);
132        }
133      }
134    }
135
136    BackupBoundaries boundaries = builder.build();
137
138    if (LOG.isDebugEnabled()) {
139      LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode());
140      for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) {
141        LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
142          entry.getValue());
143      }
144    }
145
146    return boundaries;
147  }
148
149  @Override
150  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
151    List<FileStatus> filteredFiles = new ArrayList<>();
152
153    // all members of this class are null if backup is disabled,
154    // so we cannot filter the files
155    if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
156      LOG.debug("Backup is not enabled. Check your {} setting",
157        BackupRestoreConstants.BACKUP_ENABLE_KEY);
158      return files;
159    }
160
161    BackupBoundaries boundaries;
162    try {
163      try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
164        boundaries = serverToPreservationBoundaryTs(sysTable);
165      }
166    } catch (IOException ex) {
167      LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
168        ex.getMessage(), ex);
169      return Collections.emptyList();
170    }
171    for (FileStatus file : files) {
172      if (canDeleteFile(boundaries, file.getPath())) {
173        filteredFiles.add(file);
174      }
175    }
176
177    LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files),
178      filteredFiles.size());
179    return filteredFiles;
180  }
181
182  @Override
183  public void setConf(Configuration config) {
184    // If backup is disabled, keep all members null
185    super.setConf(config);
186    if (
187      !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
188        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
189    ) {
190      LOG.warn("Backup is disabled - allowing all wals to be deleted");
191    }
192  }
193
194  @Override
195  public void stop(String why) {
196    if (!this.stopped) {
197      this.stopped = true;
198      LOG.info("Stopping BackupLogCleaner");
199    }
200  }
201
202  @Override
203  public boolean isStopped() {
204    return this.stopped;
205  }
206
207  protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) {
208    if (isHMasterWAL(path)) {
209      return true;
210    }
211    return boundaries.isDeletable(path);
212  }
213
214  private static boolean isHMasterWAL(Path path) {
215    String fn = path.getName();
216    return fn.startsWith(WALProcedureStore.LOG_PREFIX)
217      || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
218      || path.toString().contains("/%s/".formatted(MasterRegionFactory.MASTER_STORE_DIR));
219  }
220}