001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.master;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.backup.BackupInfo;
031import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
032import org.apache.hadoop.hbase.backup.impl.BackupManager;
033import org.apache.hadoop.hbase.backup.util.BackupUtils;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.MasterServices;
038import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
047import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
048
049/**
050 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
051 * before deleting it when its TTL is over.
052 */
053@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
054public class BackupLogCleaner extends BaseLogCleanerDelegate {
055  private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class);
056
057  private boolean stopped = false;
058  private Connection conn;
059
060  public BackupLogCleaner() {
061  }
062
063  @Override
064  public void init(Map<String, Object> params) {
065    MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER);
066    if (master != null) {
067      conn = master.getConnection();
068      if (getConf() == null) {
069        super.setConf(conn.getConfiguration());
070      }
071    }
072    if (conn == null) {
073      try {
074        conn = ConnectionFactory.createConnection(getConf());
075      } catch (IOException ioe) {
076        throw new RuntimeException("Failed to create connection", ioe);
077      }
078    }
079  }
080
081  private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> backups)
082    throws IOException {
083    Map<Address, Long> serverAddressToLastBackupMap = new HashMap<>();
084
085    Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>();
086    for (BackupInfo backupInfo : backups) {
087      for (TableName table : backupInfo.getTables()) {
088        tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs());
089        if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) {
090          tableNameBackupInfoMap.put(table, backupInfo.getStartTs());
091          for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
092            .entrySet()) {
093            serverAddressToLastBackupMap.put(Address.fromString(entry.getKey()), entry.getValue());
094          }
095        }
096      }
097    }
098
099    return serverAddressToLastBackupMap;
100  }
101
102  @Override
103  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
104    List<FileStatus> filteredFiles = new ArrayList<>();
105
106    // all members of this class are null if backup is disabled,
107    // so we cannot filter the files
108    if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
109      LOG.debug("Backup is not enabled. Check your {} setting",
110        BackupRestoreConstants.BACKUP_ENABLE_KEY);
111      return files;
112    }
113
114    Map<Address, Long> addressToLastBackupMap;
115    try {
116      try (BackupManager backupManager = new BackupManager(conn, getConf())) {
117        addressToLastBackupMap =
118          getServersToOldestBackupMapping(backupManager.getBackupHistory(true));
119      }
120    } catch (IOException ex) {
121      LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
122        ex.getMessage(), ex);
123      return Collections.emptyList();
124    }
125    for (FileStatus file : files) {
126      String fn = file.getPath().getName();
127      if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
128        filteredFiles.add(file);
129        continue;
130      }
131
132      try {
133        Address walServerAddress =
134          Address.fromString(BackupUtils.parseHostNameFromLogFile(file.getPath()));
135        long walTimestamp = AbstractFSWALProvider.getTimestamp(file.getPath().getName());
136
137        if (
138          !addressToLastBackupMap.containsKey(walServerAddress)
139            || addressToLastBackupMap.get(walServerAddress) >= walTimestamp
140        ) {
141          filteredFiles.add(file);
142        }
143      } catch (Exception ex) {
144        LOG.warn(
145          "Error occurred while filtering file: {} with error: {}. Ignoring cleanup of this log",
146          file.getPath(), ex.getMessage());
147      }
148    }
149
150    LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files),
151      filteredFiles.size());
152    return filteredFiles;
153  }
154
155  @Override
156  public void setConf(Configuration config) {
157    // If backup is disabled, keep all members null
158    super.setConf(config);
159    if (
160      !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
161        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
162    ) {
163      LOG.warn("Backup is disabled - allowing all wals to be deleted");
164    }
165  }
166
167  @Override
168  public void stop(String why) {
169    if (!this.stopped) {
170      this.stopped = true;
171      LOG.info("Stopping BackupLogCleaner");
172    }
173  }
174
175  @Override
176  public boolean isStopped() {
177    return this.stopped;
178  }
179}