001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.master; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.stream.Collectors; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.backup.BackupInfo; 034import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 035import org.apache.hadoop.hbase.backup.impl.BackupManager; 036import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 037import org.apache.hadoop.hbase.backup.util.BackupBoundaries; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 043import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 044import org.apache.hadoop.hbase.net.Address; 045import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 051import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 052 053/** 054 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup 055 * before deleting it when its TTL is over. 056 */ 057@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 058public class BackupLogCleaner extends BaseLogCleanerDelegate { 059 private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class); 060 private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis(); 061 static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms"; 062 063 private boolean stopped = false; 064 private Connection conn; 065 066 public BackupLogCleaner() { 067 } 068 069 @Override 070 public void init(Map<String, Object> params) { 071 MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER); 072 if (master != null) { 073 conn = master.getConnection(); 074 if (getConf() == null) { 075 super.setConf(conn.getConfiguration()); 076 } 077 } 078 if (conn == null) { 079 try { 080 conn = ConnectionFactory.createConnection(getConf()); 081 } catch (IOException ioe) { 082 throw new RuntimeException("Failed to create connection", ioe); 083 } 084 } 085 } 086 087 /** 088 * Calculates the timestamp boundary up to which all backup roots have already included the WAL. 089 * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental 090 * backups. 091 */ 092 private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable) 093 throws IOException { 094 List<BackupInfo> backups = sysTable.getBackupHistory(true); 095 if (LOG.isDebugEnabled()) { 096 LOG.debug( 097 "Cleaning WALs if they are older than the WAL cleanup time-boundary. " 098 + "Checking WALs against {} backups: {}", 099 backups.size(), 100 backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", "))); 101 } 102 103 // This map tracks, for every backup root, the most recent created backup (= highest timestamp) 104 Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>(); 105 for (BackupInfo backup : backups) { 106 BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir()); 107 if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) { 108 newestBackupPerRootDir.put(backup.getBackupRootDir(), backup); 109 } 110 } 111 112 if (LOG.isDebugEnabled()) { 113 LOG.debug("WAL cleanup time-boundary using info from: {}. ", 114 newestBackupPerRootDir.entrySet().stream() 115 .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted() 116 .collect(Collectors.joining(", "))); 117 } 118 119 BackupBoundaries.BackupBoundariesBuilder builder = 120 BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT)); 121 for (BackupInfo backupInfo : newestBackupPerRootDir.values()) { 122 long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir())); 123 // Iterate over all tables in the timestamp map, which contains all tables covered in the 124 // backup root, not just the tables included in that specific backup (which could be a subset) 125 for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) { 126 for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table) 127 .entrySet()) { 128 builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode); 129 } 130 } 131 } 132 133 BackupBoundaries boundaries = builder.build(); 134 135 if (LOG.isDebugEnabled()) { 136 LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode()); 137 for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) { 138 LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(), 139 entry.getValue()); 140 } 141 } 142 143 return boundaries; 144 } 145 146 @Override 147 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 148 List<FileStatus> filteredFiles = new ArrayList<>(); 149 150 // all members of this class are null if backup is disabled, 151 // so we cannot filter the files 152 if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { 153 LOG.debug("Backup is not enabled. Check your {} setting", 154 BackupRestoreConstants.BACKUP_ENABLE_KEY); 155 return files; 156 } 157 158 BackupBoundaries boundaries; 159 try { 160 try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { 161 boundaries = serverToPreservationBoundaryTs(sysTable); 162 } 163 } catch (IOException ex) { 164 LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", 165 ex.getMessage(), ex); 166 return Collections.emptyList(); 167 } 168 for (FileStatus file : files) { 169 if (canDeleteFile(boundaries, file.getPath())) { 170 filteredFiles.add(file); 171 } 172 } 173 174 LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), 175 filteredFiles.size()); 176 return filteredFiles; 177 } 178 179 @Override 180 public void setConf(Configuration config) { 181 // If backup is disabled, keep all members null 182 super.setConf(config); 183 if ( 184 !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, 185 BackupRestoreConstants.BACKUP_ENABLE_DEFAULT) 186 ) { 187 LOG.warn("Backup is disabled - allowing all wals to be deleted"); 188 } 189 } 190 191 @Override 192 public void stop(String why) { 193 if (!this.stopped) { 194 this.stopped = true; 195 LOG.info("Stopping BackupLogCleaner"); 196 } 197 } 198 199 @Override 200 public boolean isStopped() { 201 return this.stopped; 202 } 203 204 protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) { 205 if (isHMasterWAL(path)) { 206 return true; 207 } 208 return boundaries.isDeletable(path); 209 } 210 211 private static boolean isHMasterWAL(Path path) { 212 String fn = path.getName(); 213 return fn.startsWith(WALProcedureStore.LOG_PREFIX) 214 || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX) 215 || path.toString().contains("/%s/".formatted(MasterRegionFactory.MASTER_STORE_DIR)); 216 } 217}