001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.master; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.EnumSet; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseInterfaceAudience; 033import org.apache.hadoop.hbase.backup.BackupInfo; 034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 035import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 036import org.apache.hadoop.hbase.backup.impl.BackupManager; 037import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 038import org.apache.hadoop.hbase.backup.util.BackupBoundaries; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.master.HMaster; 042import org.apache.hadoop.hbase.master.MasterServices; 043import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 044import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 045import org.apache.hadoop.hbase.net.Address; 046import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 052import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 053 054/** 055 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup 056 * before deleting it when its TTL is over. 057 */ 058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 059public class BackupLogCleaner extends BaseLogCleanerDelegate { 060 private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class); 061 private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis(); 062 static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms"; 063 064 private boolean stopped = false; 065 private Connection conn; 066 067 public BackupLogCleaner() { 068 } 069 070 @Override 071 public void init(Map<String, Object> params) { 072 MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER); 073 if (master != null) { 074 conn = master.getConnection(); 075 if (getConf() == null) { 076 super.setConf(conn.getConfiguration()); 077 } 078 } 079 if (conn == null) { 080 try { 081 conn = ConnectionFactory.createConnection(getConf()); 082 } catch (IOException ioe) { 083 throw new RuntimeException("Failed to create connection", ioe); 084 } 085 } 086 } 087 088 /** 089 * Calculates the timestamp boundary up to which all backup roots have already included the WAL. 090 * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental 091 * backups. 092 * @param backups all completed or running backups to use for the calculation of the boundary 093 * @param tsBuffer a buffer (in ms) to lower the boundary for the default bound 094 */ 095 protected static BackupBoundaries calculatePreservationBoundary(List<BackupInfo> backups, 096 long tsBuffer) { 097 if (LOG.isDebugEnabled()) { 098 LOG.debug( 099 "Cleaning WALs if they are older than the WAL cleanup time-boundary. " 100 + "Checking WALs against {} backups: {}", 101 backups.size(), 102 backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", "))); 103 } 104 105 // This map tracks, for every backup root, the most recent (= highest timestamp) completed 106 // backup, or if there is no such one, the currently running backup (if any) 107 Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>(); 108 for (BackupInfo backup : backups) { 109 BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir()); 110 if (existingEntry == null || existingEntry.getState() == BackupState.RUNNING) { 111 newestBackupPerRootDir.put(backup.getBackupRootDir(), backup); 112 } 113 } 114 115 if (LOG.isDebugEnabled()) { 116 LOG.debug("WAL cleanup time-boundary using info from: {}. ", 117 newestBackupPerRootDir.entrySet().stream() 118 .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted() 119 .collect(Collectors.joining(", "))); 120 } 121 122 BackupBoundaries.BackupBoundariesBuilder builder = BackupBoundaries.builder(tsBuffer); 123 newestBackupPerRootDir.values().forEach(builder::update); 124 BackupBoundaries boundaries = builder.build(); 125 126 if (LOG.isDebugEnabled()) { 127 LOG.debug("Boundaries defaultBoundary: {}", boundaries.getDefaultBoundary()); 128 for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) { 129 LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(), 130 entry.getValue()); 131 } 132 } 133 134 return boundaries; 135 } 136 137 @Override 138 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 139 List<FileStatus> filteredFiles = new ArrayList<>(); 140 141 // all members of this class are null if backup is disabled, 142 // so we cannot filter the files 143 if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { 144 LOG.debug("Backup is not enabled. Check your {} setting", 145 BackupRestoreConstants.BACKUP_ENABLE_KEY); 146 return files; 147 } 148 149 BackupBoundaries boundaries; 150 try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { 151 long tsBuffer = getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT); 152 List<BackupInfo> backupHistory = sysTable.getBackupHistory( 153 i -> EnumSet.of(BackupState.COMPLETE, BackupState.RUNNING).contains(i.getState())); 154 boundaries = calculatePreservationBoundary(backupHistory, tsBuffer); 155 } catch (IOException ex) { 156 LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", 157 ex.getMessage(), ex); 158 return Collections.emptyList(); 159 } 160 for (FileStatus file : files) { 161 if (canDeleteFile(boundaries, file.getPath())) { 162 filteredFiles.add(file); 163 } 164 } 165 166 LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), 167 filteredFiles.size()); 168 return filteredFiles; 169 } 170 171 @Override 172 public void setConf(Configuration config) { 173 // If backup is disabled, keep all members null 174 super.setConf(config); 175 if ( 176 !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, 177 BackupRestoreConstants.BACKUP_ENABLE_DEFAULT) 178 ) { 179 LOG.warn("Backup is disabled - allowing all wals to be deleted"); 180 } 181 } 182 183 @Override 184 public void stop(String why) { 185 if (!this.stopped) { 186 this.stopped = true; 187 LOG.info("Stopping BackupLogCleaner"); 188 } 189 } 190 191 @Override 192 public boolean isStopped() { 193 return this.stopped; 194 } 195 196 protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) { 197 if (isHMasterWAL(path)) { 198 return true; 199 } 200 return boundaries.isDeletable(path); 201 } 202 203 private static boolean isHMasterWAL(Path path) { 204 String fn = path.getName(); 205 return fn.startsWith(WALProcedureStore.LOG_PREFIX) 206 || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX) 207 || path.toString().contains("/%s/".formatted(MasterRegionFactory.MASTER_STORE_DIR)); 208 } 209}