001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.master;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.stream.Collectors;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.BackupInfo;
034import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
035import org.apache.hadoop.hbase.backup.impl.BackupManager;
036import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
037import org.apache.hadoop.hbase.backup.util.BackupBoundaries;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.master.HMaster;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
043import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
044import org.apache.hadoop.hbase.net.Address;
045import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
051import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
052
053/**
054 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
055 * before deleting it when its TTL is over.
056 */
057@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
058public class BackupLogCleaner extends BaseLogCleanerDelegate {
059  private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class);
060  private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis();
061  static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms";
062
063  private boolean stopped = false;
064  private Connection conn;
065
066  public BackupLogCleaner() {
067  }
068
069  @Override
070  public void init(Map<String, Object> params) {
071    MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER);
072    if (master != null) {
073      conn = master.getConnection();
074      if (getConf() == null) {
075        super.setConf(conn.getConfiguration());
076      }
077    }
078    if (conn == null) {
079      try {
080        conn = ConnectionFactory.createConnection(getConf());
081      } catch (IOException ioe) {
082        throw new RuntimeException("Failed to create connection", ioe);
083      }
084    }
085  }
086
087  /**
088   * Calculates the timestamp boundary up to which all backup roots have already included the WAL.
089   * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
090   * backups.
091   */
092  private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable)
093    throws IOException {
094    List<BackupInfo> backups = sysTable.getBackupHistory(true);
095    if (LOG.isDebugEnabled()) {
096      LOG.debug(
097        "Cleaning WALs if they are older than the WAL cleanup time-boundary. "
098          + "Checking WALs against {} backups: {}",
099        backups.size(),
100        backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
101    }
102
103    // This map tracks, for every backup root, the most recent created backup (= highest timestamp)
104    Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
105    for (BackupInfo backup : backups) {
106      BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
107      if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
108        newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
109      }
110    }
111
112    if (LOG.isDebugEnabled()) {
113      LOG.debug("WAL cleanup time-boundary using info from: {}. ",
114        newestBackupPerRootDir.entrySet().stream()
115          .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted()
116          .collect(Collectors.joining(", ")));
117    }
118
119    BackupBoundaries.BackupBoundariesBuilder builder =
120      BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT));
121    for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
122      long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir()));
123      // Iterate over all tables in the timestamp map, which contains all tables covered in the
124      // backup root, not just the tables included in that specific backup (which could be a subset)
125      for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
126        for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
127          .entrySet()) {
128          builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode);
129        }
130      }
131    }
132
133    BackupBoundaries boundaries = builder.build();
134
135    if (LOG.isDebugEnabled()) {
136      LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode());
137      for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) {
138        LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
139          entry.getValue());
140      }
141    }
142
143    return boundaries;
144  }
145
146  @Override
147  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
148    List<FileStatus> filteredFiles = new ArrayList<>();
149
150    // all members of this class are null if backup is disabled,
151    // so we cannot filter the files
152    if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
153      LOG.debug("Backup is not enabled. Check your {} setting",
154        BackupRestoreConstants.BACKUP_ENABLE_KEY);
155      return files;
156    }
157
158    BackupBoundaries boundaries;
159    try {
160      try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
161        boundaries = serverToPreservationBoundaryTs(sysTable);
162      }
163    } catch (IOException ex) {
164      LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
165        ex.getMessage(), ex);
166      return Collections.emptyList();
167    }
168    for (FileStatus file : files) {
169      if (canDeleteFile(boundaries, file.getPath())) {
170        filteredFiles.add(file);
171      }
172    }
173
174    LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files),
175      filteredFiles.size());
176    return filteredFiles;
177  }
178
179  @Override
180  public void setConf(Configuration config) {
181    // If backup is disabled, keep all members null
182    super.setConf(config);
183    if (
184      !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
185        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
186    ) {
187      LOG.warn("Backup is disabled - allowing all wals to be deleted");
188    }
189  }
190
191  @Override
192  public void stop(String why) {
193    if (!this.stopped) {
194      this.stopped = true;
195      LOG.info("Stopping BackupLogCleaner");
196    }
197  }
198
199  @Override
200  public boolean isStopped() {
201    return this.stopped;
202  }
203
204  protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) {
205    if (isHMasterWAL(path)) {
206      return true;
207    }
208    return boundaries.isDeletable(path);
209  }
210
211  private static boolean isHMasterWAL(Path path) {
212    String fn = path.getName();
213    return fn.startsWith(WALProcedureStore.LOG_PREFIX)
214      || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
215      || path.toString().contains("/%s/".formatted(MasterRegionFactory.MASTER_STORE_DIR));
216  }
217}